From 96fb592c57a5b58f6891ef72ae7bd153fd1753f9 Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Mon, 26 Jan 2026 11:15:32 -0800 Subject: [PATCH 1/4] Use homa_qdisc by default in CloudLab config --- cloudlab/bin/config | 1 + 1 file changed, 1 insertion(+) diff --git a/cloudlab/bin/config b/cloudlab/bin/config index 5ccf574f..56642a41 100755 --- a/cloudlab/bin/config +++ b/cloudlab/bin/config @@ -840,6 +840,7 @@ while i < len(sys.argv): config_nic() config_power() config_rps() + config_qdisc() elif arg == "ecn_threshold": if i >= len(sys.argv): raise Exception("No argument provided for 'ecn_threshold' command"); From 483ea798f82d351a056f0da1ba8d50e318675d45 Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Mon, 26 Jan 2026 11:17:45 -0800 Subject: [PATCH 2/4] Rework mechanism for initiating grant management of RPCs Delay starting management of grants for RPCs until homa_grant_check_rpc is invoked. --- homa_grant.c | 42 +++++++++++++++++++------------- homa_pool.c | 3 +-- homa_rpc.h | 5 ++-- test/unit_homa_grant.c | 54 ++++++++++++++++-------------------------- 4 files changed, 49 insertions(+), 55 deletions(-) diff --git a/homa_grant.c b/homa_grant.c index 1ea24a8d..80e90b78 100644 --- a/homa_grant.c +++ b/homa_grant.c @@ -140,7 +140,7 @@ void homa_grant_free(struct homa_grant *grant) /** * homa_grant_init_rpc() - Initialize grant-related information for an - * RPC's incoming message (may add the RPC to grant priority queues). + * RPC's incoming message. * @rpc: RPC being initialized. Grant-related fields in msgin * are assumed to be zero. Must be locked by caller. * @unsched: Number of unscheduled bytes in the incoming message for @rpc. @@ -149,18 +149,10 @@ void homa_grant_init_rpc(struct homa_rpc *rpc, int unsched) __must_hold(rpc->bucket->lock) { rpc->msgin.rank = -1; - if (unsched >= rpc->msgin.length) { - rpc->msgin.granted = rpc->msgin.length; - rpc->msgin.prev_grant = rpc->msgin.granted; - return; - } + if (unsched >= rpc->msgin.length) + unsched = rpc->msgin.length; rpc->msgin.granted = unsched; rpc->msgin.prev_grant = unsched; - if (rpc->msgin.num_bpages != 0) - /* Can't issue grants unless buffer space has been allocated - * for the message. - */ - homa_grant_manage_rpc(rpc); } /** @@ -443,7 +435,8 @@ void homa_grant_insert_grantable(struct homa_rpc *rpc) * homa_grant_manage_rpc() - Insert an RPC into the priority-based data * structures for managing grantable RPCs (active_rpcs or grantable_peers). * Ensures that the RPC will be sent grants as needed. - * @rpc: The RPC to add. Must be locked by caller. + * @rpc: The RPC to add. Must be locked by caller. May already be + * inserted into the grant structures. */ void homa_grant_manage_rpc(struct homa_rpc *rpc) __must_hold(rpc->bucket->lock) @@ -452,10 +445,13 @@ void homa_grant_manage_rpc(struct homa_rpc *rpc) struct homa_rpc *bumped; u64 time = homa_clock(); - BUG_ON(rpc->msgin.rank >= 0 || !list_empty(&rpc->grantable_links)); - homa_grant_lock(grant); + if (rpc->msgin.rank >= 0 || !list_empty(&rpc->grantable_links)) { + homa_grant_unlock(grant); + return; + } + INC_METRIC(grantable_rpcs_integral, grant->num_grantable_rpcs * (time - grant->last_grantable_change)); grant->last_grantable_change = time; @@ -681,9 +677,10 @@ void homa_grant_send(struct homa_rpc *rpc, int priority) /** * homa_grant_check_rpc() - This function is responsible for generating - * grant packets. Is invoked whenever a data packet arrives for RPC; it - * checks the state of that RPC (as well as other RPCs) and generates - * grant packets as appropriate. + * grant packets. It is invoked when the state of an RPC has changed in + * ways that might permit grants to be issued (either to this RPC or other + * RPCs), such as the arrival of a DATA packet. It reviews the state of + * grants and issues grant packets as appropriate. * @rpc: RPC to check. Must be locked by the caller. */ void homa_grant_check_rpc(struct homa_rpc *rpc) @@ -733,6 +730,17 @@ void homa_grant_check_rpc(struct homa_rpc *rpc) rpc->msgin.length); INC_METRIC(grant_check_calls, 1); + /* Races can cause the test below to invoke homa_grant_manage_rpc when + * rpc is already managed, but it will never fail to invoke + * homa_grant_manage_rpc if the RPC is unmanaged. This is an + * optimization to reduce the number of times the grant lock must + * be acquired. + */ + if (rpc->msgin.granted < rpc->msgin.length && + READ_ONCE(rpc->msgin.rank) < 0 && + list_empty(&rpc->grantable_links)) + homa_grant_manage_rpc(rpc); + needy_rank = INT_MAX; now = homa_clock(); homa_grant_update_incoming(rpc, grant); diff --git a/homa_pool.c b/homa_pool.c index 9bc6b33f..64f7b0c5 100644 --- a/homa_pool.c +++ b/homa_pool.c @@ -539,8 +539,7 @@ void homa_pool_check_waiting(struct homa_pool *pool) resend.length = htonl(-1); resend.priority = homa_high_priority(rpc->hsk->homa); homa_xmit_control(RESEND, &resend, sizeof(resend), rpc); - if (rpc->msgin.granted < rpc->msgin.length) - homa_grant_manage_rpc(rpc); + homa_grant_check_rpc(rpc); } #endif /* See strip.py */ homa_rpc_unlock(rpc); diff --git a/homa_rpc.h b/homa_rpc.h index 16750746..bf338a43 100644 --- a/homa_rpc.h +++ b/homa_rpc.h @@ -205,7 +205,7 @@ struct homa_message_in { /** * @prev_grant: Offset in the last GRANT packet sent for this RPC - * (initially set to unscheduled bytes). + * (initially set to unscheduled bytes). Managed by homa_grant.c. */ int prev_grant; @@ -421,8 +421,7 @@ struct homa_rpc { /** * @grantable_links: Used to link this RPC into peer->grantable_rpcs. * If this RPC isn't in peer->grantable_rpcs, this is an empty - * list pointing to itself. Must hold homa->grant->lock when - * accessing. + * list pointing to itself. Must hold homa->grant->lock when accessing. */ struct list_head grantable_links; #endif /* See strip.py */ diff --git a/test/unit_homa_grant.c b/test/unit_homa_grant.c index 150edd99..0b9b590b 100644 --- a/test/unit_homa_grant.c +++ b/test/unit_homa_grant.c @@ -152,6 +152,7 @@ static struct homa_rpc *test_rpc_init(FIXTURE_DATA(homa_grant) *self, self->client_ip, server_ip, self->server_port, id, 1000, size); homa_message_in_init(rpc, size, 0); + homa_grant_manage_rpc(rpc); return rpc; } @@ -200,30 +201,18 @@ TEST_F(homa_grant, homa_grant_free__sysctls_not_registered) EXPECT_STREQ("", unit_log_get()); } -TEST_F(homa_grant, homa_grant_init_rpc__grants_not_needed) +TEST_F(homa_grant, homa_grant_init_rpc__basics) { struct homa_rpc *rpc; rpc= unit_client_rpc(&self->hsk, UNIT_OUTGOING, self->client_ip, self->server_ip, self->server_port, 100, 1000, 20000); - homa_message_in_init(rpc, 2000, 2000); + homa_message_in_init(rpc, 2000, 800); EXPECT_EQ(-1, rpc->msgin.rank); - EXPECT_EQ(2000, rpc->msgin.granted); -} -TEST_F(homa_grant, homa_grant_init_rpc__grants_needed) -{ - struct homa_rpc *rpc; - - rpc= unit_client_rpc(&self->hsk, UNIT_OUTGOING, self->client_ip, - self->server_ip, self->server_port, 100, 1000, - 20000); - - homa_message_in_init(rpc, 5000, 2000); - EXPECT_EQ(0, rpc->msgin.rank); - EXPECT_EQ(2000, rpc->msgin.granted); + EXPECT_EQ(800, rpc->msgin.granted); } -TEST_F(homa_grant, homa_grant_init_rpc__no_bpages_available) +TEST_F(homa_grant, homa_grant_init_rpc__unsched_past_end_of_message) { struct homa_rpc *rpc; @@ -231,11 +220,9 @@ TEST_F(homa_grant, homa_grant_init_rpc__no_bpages_available) self->server_ip, self->server_port, 100, 1000, 20000); - atomic_set(&self->hsk.buffer_pool->free_bpages, 0); - homa_message_in_init(rpc, 20000, 10000); - EXPECT_EQ(0, rpc->msgin.num_bpages); + homa_message_in_init(rpc, 2000, 3000); EXPECT_EQ(-1, rpc->msgin.rank); - EXPECT_EQ(10000, rpc->msgin.granted); + EXPECT_EQ(2000, rpc->msgin.granted); } TEST_F(homa_grant, homa_grant_end_rpc__basics) @@ -1079,15 +1066,16 @@ TEST_F(homa_grant, homa_grant_check_rpc__rpc_dead) EXPECT_EQ(0, rpc->msgin.granted); rpc->state = RPC_INCOMING; } -TEST_F(homa_grant, homa_grant_check_rpc__update_incoming_even_if_rpc_no_longer_active) +TEST_F(homa_grant, homa_grant_check_rpc__update_incoming_even_if_rpc_not_active) { - struct homa_rpc *rpc = unit_client_rpc(&self->hsk, UNIT_OUTGOING, - self->client_ip, self->server_ip, self->server_port, - 100, 1000, 2000); + struct homa_rpc *rpc; + + self->homa.grant->max_overcommit = 1; + test_rpc_init(self, 100, self->server_ip, 20000); + rpc = test_rpc_init(self, 102, self->server_ip, 30000); + + EXPECT_EQ(-1, rpc->msgin.rank); - homa_message_in_init(rpc, 2000, 0); - EXPECT_EQ(0, rpc->msgin.rank); - rpc->msgin.rank = -1; rpc->msgin.rec_incoming = 100; atomic_set(&self->homa.grant->total_incoming, 1000); unit_log_clear(); @@ -1160,13 +1148,13 @@ TEST_F(homa_grant, homa_grant_check_rpc__fast_path) } TEST_F(homa_grant, homa_grant_check_rpc__skip_fast_path_rpc_not_active) { - struct homa_rpc *rpc = unit_client_rpc(&self->hsk, UNIT_OUTGOING, - self->client_ip, self->server_ip, self->server_port, - 100, 1000, 20000); + struct homa_rpc *rpc; - homa_message_in_init(rpc, 20000, 0); - EXPECT_EQ(0, rpc->msgin.rank); - rpc->msgin.rank = -1; + self->homa.grant->max_overcommit = 1; + test_rpc_init(self, 100, self->server_ip, 20000); + rpc = test_rpc_init(self, 102, self->server_ip, 30000); + + EXPECT_EQ(-1, rpc->msgin.rank); unit_log_clear(); homa_rpc_lock(rpc); From 8c73633a4009703e48d0dec97a95565203de3913 Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Thu, 22 Jan 2026 09:33:21 -0800 Subject: [PATCH 3/4] Make messages entirely scheduled or entirely unscheduled --- homa_impl.h | 4 +- homa_incoming.c | 8 ++- homa_offload.c | 7 ++- homa_outgoing.c | 79 ++++++++++----------------- homa_plumbing.c | 22 +++++++- homa_qdisc.c | 4 +- homa_rpc.c | 2 +- homa_rpc.h | 21 +++----- notes.txt | 12 ++++- test/mock.c | 6 ++- test/unit_homa_incoming.c | 32 ++++++----- test/unit_homa_offload.c | 4 +- test/unit_homa_outgoing.c | 110 +++++++++++++++----------------------- test/unit_homa_plumbing.c | 21 ++++++++ test/unit_homa_qdisc.c | 31 ++++++++++- test/unit_homa_rpc.c | 5 +- test/unit_homa_timer.c | 6 +-- test/utils.c | 4 +- 18 files changed, 208 insertions(+), 170 deletions(-) diff --git a/homa_impl.h b/homa_impl.h index 61c4c912..692fa226 100644 --- a/homa_impl.h +++ b/homa_impl.h @@ -726,8 +726,7 @@ int homa_init(struct homa *homa); int homa_ioc_info(struct socket *sock, unsigned long arg); int homa_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg); int homa_load(void); -int homa_message_out_fill(struct homa_rpc *rpc, - struct iov_iter *iter, int xmit); +int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter); void homa_message_out_init(struct homa_rpc *rpc, int length); void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, struct homa_rpc *rpc); @@ -788,6 +787,7 @@ int homa_unsched_priority(struct homa *homa, struct homa_peer *peer, void homa_xmit_data(struct homa_rpc *rpc, bool force); void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc, int priority); +void homa_xmit_grant_request(struct homa_rpc *rpc, int length); #else /* See strip.py */ int homa_message_in_init(struct homa_rpc *rpc, int unsched); void homa_resend_data(struct homa_rpc *rpc, int start, int end); diff --git a/homa_incoming.c b/homa_incoming.c index 06181802..912c0ac9 100644 --- a/homa_incoming.c +++ b/homa_incoming.c @@ -177,6 +177,12 @@ void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb) goto discard; } + if (length == 0) + /* This is the initial packet for a scheduled message; its + * only purpose is to trigger grant generation. + */ + goto discard; + if (start == rpc->msgin.recv_end) { /* Common case: packet is sequential. */ rpc->msgin.recv_end += length; @@ -775,7 +781,7 @@ void homa_grant_pkt(struct sk_buff *skb, struct homa_rpc *rpc) if (new_offset > rpc->msgout.length) rpc->msgout.granted = rpc->msgout.length; } - rpc->msgout.sched_priority = h->priority; + rpc->msgout.priority = h->priority; homa_xmit_data(rpc, false); } consume_skb(skb); diff --git a/homa_offload.c b/homa_offload.c index 812b567c..57b2e82f 100644 --- a/homa_offload.c +++ b/homa_offload.c @@ -359,6 +359,8 @@ struct sk_buff *homa_gro_receive(struct list_head *held_list, h_new->common.type, priority); #endif /* See strip.py */ } + if (homa->gro_policy & 0x100) + goto bypass; /* The GRO mechanism tries to separate packets onto different * gro_lists by hash. This is bad for us, because we want to batch @@ -619,7 +621,10 @@ int homa_gro_complete(struct sk_buff *skb, int hoffset) // NAPI_GRO_CB(skb)->count); per_cpu(homa_offload_core, smp_processor_id()).held_skb = NULL; - if (homa->gro_policy & HOMA_GRO_GEN3) { + if (homa->gro_policy & 0x200) { + tt_record("Set softirq CPU to current core"); + homa_set_softirq_cpu(skb, smp_processor_id()); + } else if (homa->gro_policy & HOMA_GRO_GEN3) { homa_gro_gen3(homa, skb); } else if (homa->gro_policy & HOMA_GRO_GEN2) { homa_gro_gen2(homa, skb); diff --git a/homa_outgoing.c b/homa_outgoing.c index 12cce77a..291a1ca8 100644 --- a/homa_outgoing.c +++ b/homa_outgoing.c @@ -30,9 +30,10 @@ void homa_message_out_init(struct homa_rpc *rpc, int length) rpc->msgout.length = length; rpc->msgout.next_xmit = &rpc->msgout.packets; #ifndef __STRIP__ /* See strip.py */ - rpc->msgout.unscheduled = rpc->hsk->homa->unsched_bytes; - if (rpc->msgout.unscheduled > length) - rpc->msgout.unscheduled = length; + rpc->msgout.priority = homa_unsched_priority(rpc->hsk->homa, rpc->peer, + length); + if (rpc->hsk->homa->unsched_bytes >= length) + rpc->msgout.granted = length; #endif /* See strip.py */ rpc->msgout.init_time = homa_clock(); } @@ -165,7 +166,7 @@ struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, h->common.checksum = 0; h->common.sender_id = cpu_to_be64(rpc->id); h->message_length = htonl(rpc->msgout.length); - IF_NO_STRIP(h->incoming = htonl(rpc->msgout.unscheduled)); + IF_NO_STRIP(h->incoming = htonl(rpc->msgout.granted)); h->ack.client_id = 0; homa_peer_get_acks(rpc->peer, 1, &h->ack); IF_NO_STRIP(h->cutoff_version = rpc->peer->cutoff_version); @@ -226,25 +227,21 @@ struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, } /** - * homa_message_out_fill() - Initializes information for sending a message + * homa_message_out_fill() - Initialize information for sending a message * for an RPC (either request or response); copies the message data from - * user space and (possibly) begins transmitting the message. + * user space. * @rpc: RPC for which to send message; this function must not * previously have been called for the RPC. Must be locked. The RPC * will be unlocked while copying data, but will be locked again * before returning. * @iter: Describes location(s) of message data in user space. - * @xmit: Nonzero means this method should start transmitting packets; - * transmission will be overlapped with copying from user space. - * Zero means the caller will initiate transmission after this - * function returns. * * Return: 0 for success, or a negative errno for failure. It is possible * for the RPC to be freed while this function is active. If that * happens, copying will cease, -EINVAL will be returned, and * rpc->state will be RPC_DEAD. Sets rpc->hsk->error_msg on errors. */ -int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) +int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter) __must_hold(rpc->bucket->lock) { /* Geometry information for packets: @@ -314,16 +311,12 @@ int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) max_gso_data = segs_per_gso * max_seg_data; UNIT_LOG("; ", "mtu %d, max_seg_data %d, max_gso_data %d", mtu, max_seg_data, max_gso_data); - -#ifndef __STRIP__ /* See strip.py */ - rpc->msgout.granted = rpc->msgout.unscheduled; -#endif /* See strip.py */ homa_skb_stash_pages(rpc->hsk->homa, rpc->msgout.length); /* Each iteration of the loop below creates one GSO packet. */ #ifndef __STRIP__ /* See strip.py */ tt_record3("starting copy from user space for id %d, length %d, unscheduled %d", - rpc->id, rpc->msgout.length, rpc->msgout.unscheduled); + rpc->id, rpc->msgout.length, rpc->msgout.granted != 0); #else /* See strip.py */ tt_record2("starting copy from user space for id %d, length %d", rpc->id, rpc->msgout.length); @@ -336,15 +329,6 @@ int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) homa_rpc_unlock(rpc); skb_data_bytes = max_gso_data; offset = rpc->msgout.length - bytes_left; -#ifndef __STRIP__ /* See strip.py */ - if (offset < rpc->msgout.unscheduled && - (offset + skb_data_bytes) > rpc->msgout.unscheduled) { - /* Insert a packet boundary at the unscheduled limit, - * so we don't transmit extra data. - */ - skb_data_bytes = rpc->msgout.unscheduled - offset; - } -#endif /* See strip.py */ if (skb_data_bytes > bytes_left) skb_data_bytes = bytes_left; skb = homa_tx_data_pkt_alloc(rpc, iter, offset, skb_data_bytes, @@ -371,33 +355,11 @@ int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) rpc->msgout.skb_memory += skb->truesize; rpc->msgout.copied_from_user = rpc->msgout.length - bytes_left; rpc->msgout.first_not_tx = rpc->msgout.packets; -#ifndef __STRIP__ /* See strip.py */ - /* The code below improves pipelining for long messages - * by overlapping transmission with copying from user space. - * This is a bit tricky because sending the packets takes - * a significant amount time. On high-speed networks (e.g. - * 100 Gbps and above), copying from user space is the - * bottleneck, so transmitting the packets here will slow - * that down. Thus, we only transmit the unscheduled packets - * here, to fill the pipe. Packets after that can be - * transmitted by SoftIRQ in response to incoming grants; - * this allows us to use two cores: this core copying data - * and the SoftIRQ core sending packets. - */ - if (offset < rpc->msgout.unscheduled && xmit) - homa_xmit_data(rpc, false); -#endif /* See strip.py */ } tt_record2("finished copy from user space for id %d, length %d", rpc->id, rpc->msgout.length); INC_METRIC(sent_msg_bytes, rpc->msgout.length); refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc); - if (xmit) -#ifndef __STRIP__ /* See strip.py */ - homa_xmit_data(rpc, false); -#else /* See strip.py */ - homa_xmit_data(rpc); -#endif /* See strip.py */ return 0; error: @@ -542,6 +504,23 @@ void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk) } #ifndef __STRIP__ /* See strip.py */ +/** + * homa_xmit_grant_request() - Send an initial empty data packet for an outgoing + * RPC so that the peer will (eventually) send grants for the RPC. + * @rpc: RPC for which grants are needed: must not be an unscheduled RPC. + * @length: Number of bytes in outgoing message. + */ +void homa_xmit_grant_request(struct homa_rpc *rpc, int length) +{ + struct homa_data_hdr h; + + memset(&h, 0, sizeof(h)); + h.message_length = htonl(length); + h.incoming = 0; + IF_NO_STRIP(h.cutoff_version = rpc->peer->cutoff_version); + homa_xmit_control(DATA, &h, sizeof(h), rpc); +} + /** * homa_xmit_data() - If an RPC has outbound data packets that are permitted * to be transmitted according to the scheduling mechanism, arrange for @@ -603,11 +582,7 @@ void homa_xmit_data(struct homa_rpc *rpc) } } - if (rpc->msgout.next_xmit_offset < rpc->msgout.unscheduled) - priority = homa_unsched_priority(homa, rpc->peer, - rpc->msgout.length); - else - priority = rpc->msgout.sched_priority; + priority = rpc->msgout.priority; #endif /* See strip.py */ rpc->msgout.next_xmit = &(homa_get_skb_info(skb)->next_skb); length = homa_get_skb_info(skb)->data_bytes; diff --git a/homa_plumbing.c b/homa_plumbing.c index e8a4a020..7760d7cf 100644 --- a/homa_plumbing.c +++ b/homa_plumbing.c @@ -1216,9 +1216,18 @@ int homa_sendmsg(struct sock *sk, struct msghdr *msg, size_t length) : tt_addr(addr->in6.sin6_addr), ntohs(addr->in6.sin6_port), rpc->id, length); rpc->completion_cookie = args.completion_cookie; - result = homa_message_out_fill(rpc, &msg->msg_iter, 1); +#ifndef __STRIP__ /* See strip.py */ + if (msg->msg_iter.count > hsk->homa->unsched_bytes) + homa_xmit_grant_request(rpc, msg->msg_iter.count); +#endif /* See strip.py */ + result = homa_message_out_fill(rpc, &msg->msg_iter); if (result) goto error; +#ifndef __STRIP__ /* See strip.py */ + homa_xmit_data(rpc, false); +#else /* See strip.py */ + homa_xmit_data(rpc); +#endif /* See strip.py */ args.id = rpc->id; homa_rpc_unlock(rpc); /* Locked by homa_rpc_alloc_client. */ @@ -1271,9 +1280,18 @@ int homa_sendmsg(struct sock *sk, struct msghdr *msg, size_t length) } rpc->state = RPC_OUTGOING; - result = homa_message_out_fill(rpc, &msg->msg_iter, 1); +#ifndef __STRIP__ /* See strip.py */ + if (msg->msg_iter.count > hsk->homa->unsched_bytes) + homa_xmit_grant_request(rpc, msg->msg_iter.count); +#endif /* See strip.py */ + result = homa_message_out_fill(rpc, &msg->msg_iter); if (result && rpc->state != RPC_DEAD) goto error; +#ifndef __STRIP__ /* See strip.py */ + homa_xmit_data(rpc, false); +#else /* See strip.py */ + homa_xmit_data(rpc); +#endif /* See strip.py */ homa_rpc_put(rpc); homa_rpc_unlock(rpc); /* Locked by homa_rpc_find_server. */ #ifndef __STRIP__ /* See strip.py */ diff --git a/homa_qdisc.c b/homa_qdisc.c index 9fa07e64..8c546540 100755 --- a/homa_qdisc.c +++ b/homa_qdisc.c @@ -504,8 +504,8 @@ int homa_qdisc_enqueue(struct sk_buff *skb, struct Qdisc *sch, */ h = (struct homa_data_hdr *)skb_transport_header(skb); offset = homa_get_offset(h); - if (h->common.type != DATA || ntohl(h->message_length) < - qshared->defer_min_bytes) { + if (h->common.type != DATA || homa_data_len(skb) == 0 || + ntohl(h->message_length) < qshared->defer_min_bytes) { homa_qdisc_update_link_idle(qdev, pkt_len, -1); goto enqueue; } diff --git a/homa_rpc.c b/homa_rpc.c index 1b6a36e2..df683081 100644 --- a/homa_rpc.c +++ b/homa_rpc.c @@ -796,7 +796,7 @@ void homa_rpc_get_info(struct homa_rpc *rpc, struct homa_rpc_info *info) info->tx_sent = rpc->msgout.next_xmit_offset; #ifndef __STRIP__ /* See strip.py */ info->tx_granted = rpc->msgout.granted; - info->tx_prio = rpc->msgout.sched_priority; + info->tx_prio = rpc->msgout.priority; #else /* See strip.py */ info->tx_granted = rpc->msgout.length; #endif /* See strip.py */ diff --git a/homa_rpc.h b/homa_rpc.h index bf338a43..e886fd41 100644 --- a/homa_rpc.h +++ b/homa_rpc.h @@ -91,24 +91,15 @@ struct homa_message_out { #ifndef __STRIP__ /* See strip.py */ /** - * @unscheduled: Initial bytes of message that we'll send - * without waiting for grants. - */ - int unscheduled; - - /** - * @granted: Total number of bytes we are currently permitted to - * send, including unscheduled bytes; must wait for grants before - * sending bytes at or beyond this position. Never larger than - * @length. + * @granted: Total number of initial bytes we are currently permitted + * to send; must wait for grants before sending bytes at or beyond + * this position. For unscheduled messages, equals @length. Never + * larger than @length. */ int granted; - /** - * @sched_priority: Priority level to use for future scheduled - * packets. - */ - u8 sched_priority; + /** @priority: Priority level to use for transmitted packets. */ + u8 priority; #endif /* See strip.py */ /** diff --git a/notes.txt b/notes.txt index b008fc3d..bc47df1f 100755 --- a/notes.txt +++ b/notes.txt @@ -1,6 +1,14 @@ Notes for Homa implementation in Linux: --------------------------------------- +* Notes for new granting mechanism: + * Replace homa_data_hdr with a boolean @granted? Or eliminate completely + (receiver can figure out whether message is unscheduled)? + * Update tthoma.py (e.g. unscheduled) + * What happens if probe packet is lost? Retransmit probe? + * Update homa_prio.cc + * Compile with __STRIP__ + * (12/12/25) Something is wrong with the xl170 cluster: both Homa and TCP are showing considerably worse performance than previously. I tried multiple different clusters, and tried backing out to older versions of Homa and @@ -36,7 +44,7 @@ Notes for Homa implementation in Linux: configuration parameter fifo_mbps? Maybe the grant mechanism needs to be net_device-specific? -* Eliminate HOMA_FLAG_DONT_THROTTLE +* Eliminate HOMA_FLAG_DONT_THROTTLE (only needed for old pacer) * Optimizations for skb freeing: * In GRO, merge page frags out of skbs and return skbs to napi with @@ -52,7 +60,7 @@ Notes for Homa implementation in Linux: then send again, etc. Hmmm, I believe there is a reason why this won't work, but I have forgotten what it is. -* (July 2024) Found throughput problem in 2-node "--workload 50000 --one-way" +* (July 2024) Found throughput problem in 2-node "--workload 500000 --one-way" benchmark. The first packet for message N doesn't get sent until message N-1 has been completely transmitted. This results in a gap between the completion of message N and the arrival of a grant for message N+1, which diff --git a/test/mock.c b/test/mock.c index 9a0e3e15..721390ee 100644 --- a/test/mock.c +++ b/test/mock.c @@ -204,7 +204,9 @@ bool mock_ipv6 = true; /* The value to use for mock_ipv6 in each test unless overridden. */ bool mock_ipv6_default; -/* List of priorities for all outbound packets. */ +/* Information about priorities for outbound packets is appended to + * this string. + */ char mock_xmit_prios[1000]; int mock_xmit_prios_offset; @@ -1978,7 +1980,7 @@ void mock_put_page(struct page *page) * @saddr: IPv6 address to use as the sender of the packet, in * network byte order. * @protocol: Protocol to use in the IP header, such as IPPROTO_HOMA. - * @length: How many bytes of space to allocated after the IP header. + * @length: How many bytes of space to allocate after the IP header. * Return: The new packet buffer, initialized as if the packet just * arrived from the network and is about to be processed at * transport level (e.g. there will be an IP header before diff --git a/test/unit_homa_incoming.c b/test/unit_homa_incoming.c index 4ce7e5e0..1bf5259f 100644 --- a/test/unit_homa_incoming.c +++ b/test/unit_homa_incoming.c @@ -348,6 +348,18 @@ TEST_F(homa_incoming, homa_add_packet__packet_overlaps_message_end) &self->data.common, 1400, 1400)); EXPECT_EQ(0, skb_queue_len(&crpc->msgin.packets)); } +TEST_F(homa_incoming, homa_add_packet__no_data_in_packet) +{ + struct homa_rpc *crpc = unit_client_rpc(&self->hsk, + UNIT_OUTGOING, self->client_ip, self->server_ip, + self->server_port, 99, 1000, 1000); + + homa_message_in_init(crpc, 10000, 0); + unit_log_clear(); + homa_add_packet(crpc, mock_skb_alloc(self->client_ip, + &self->data.common, 0, 0)); + EXPECT_EQ(0, skb_queue_len(&crpc->msgin.packets)); +} TEST_F(homa_incoming, homa_add_packet__sequential_packets) { struct homa_rpc *crpc = unit_client_rpc(&self->hsk, @@ -1187,7 +1199,6 @@ TEST_F(homa_incoming, homa_dispatch_pkts__reset_counters) .offset = htonl(12600), .priority = 3}; ASSERT_NE(NULL, crpc); - EXPECT_EQ(10000, crpc->msgout.granted); unit_log_clear(); crpc->silent_ticks = 5; crpc->peer->outstanding_resends = 2; @@ -1261,9 +1272,6 @@ TEST_F(homa_incoming, homa_dispatch_pkts__unknown_type) self->server_port, self->client_id, 20000, 1600); ASSERT_NE(NULL, crpc); -#ifndef __STRIP__ /* See strip.py */ - EXPECT_EQ(10000, crpc->msgout.granted); -#endif /* See strip.py */ unit_log_clear(); struct homa_common_hdr h = {.sport = htons(self->server_port), @@ -1605,7 +1613,7 @@ TEST_F(homa_incoming, homa_grant_pkt__basics) .dport = htons(self->hsk.port), .sender_id = cpu_to_be64(self->client_id), .type = GRANT}, - .offset = htonl(11000), + .offset = htonl(1000), .priority = 3}; ASSERT_NE(NULL, srpc); @@ -1615,22 +1623,22 @@ TEST_F(homa_incoming, homa_grant_pkt__basics) unit_log_clear(); homa_dispatch_pkts(mock_skb_alloc(self->client_ip, &h.common, 0, 0)); - EXPECT_EQ(11000, srpc->msgout.granted); - EXPECT_STREQ("xmit DATA 1400@10000", unit_log_get()); + EXPECT_EQ(1000, srpc->msgout.granted); + EXPECT_STREQ("xmit DATA 1400@0", unit_log_get()); /* Don't let grant offset go backwards. */ - h.offset = htonl(10000); + h.offset = htonl(900); unit_log_clear(); homa_dispatch_pkts(mock_skb_alloc(self->client_ip, &h.common, 0, 0)); - EXPECT_EQ(11000, srpc->msgout.granted); + EXPECT_EQ(1000, srpc->msgout.granted); EXPECT_STREQ("", unit_log_get()); /* Wrong state. */ - h.offset = htonl(20000); + h.offset = htonl(5000); srpc->state = RPC_INCOMING; unit_log_clear(); homa_dispatch_pkts(mock_skb_alloc(self->client_ip, &h.common, 0, 0)); - EXPECT_EQ(11000, srpc->msgout.granted); + EXPECT_EQ(1000, srpc->msgout.granted); EXPECT_STREQ("", unit_log_get()); /* Must restore old state to avoid potential crashes. */ @@ -1725,6 +1733,7 @@ TEST_F(homa_incoming, homa_resend_pkt__negative_length_in_resend) ASSERT_NE(NULL, srpc); unit_log_clear(); srpc->msgout.next_xmit_offset = 2000; + srpc->msgout.granted = 3000; homa_dispatch_pkts(mock_skb_alloc(self->client_ip, &h.common, 0, 0)); EXPECT_STREQ("xmit DATA retrans 1400@0; " @@ -1945,7 +1954,6 @@ TEST_F(homa_incoming, homa_cutoffs_pkt_basics) .cutoff_version = 400}; ASSERT_NE(NULL, crpc); - EXPECT_EQ(10000, crpc->msgout.granted); unit_log_clear(); homa_dispatch_pkts(mock_skb_alloc(self->server_ip, &h.common, 0, 0)); diff --git a/test/unit_homa_offload.c b/test/unit_homa_offload.c index c64dd7a0..fde3914b 100644 --- a/test/unit_homa_offload.c +++ b/test/unit_homa_offload.c @@ -335,7 +335,7 @@ TEST_F(homa_offload, homa_gro_receive__fast_grant_optimization) h.common.dport = htons(self->hsk.port); h.common.sender_id = cpu_to_be64(client_id); h.common.type = GRANT; - h.offset = htonl(11000); + h.offset = htonl(1000); h.priority = 3; /* First attempt: HOMA_GRO_FAST_GRANTS not enabled. */ @@ -353,7 +353,7 @@ TEST_F(homa_offload, homa_gro_receive__fast_grant_optimization) result = homa_gro_receive(&self->empty_list, skb2); EXPECT_EQ(EINPROGRESS, -PTR_ERR(result)); EXPECT_EQ(1, homa_metrics_per_cpu()->gro_grant_bypasses); - EXPECT_SUBSTR("xmit DATA 1400@10000", unit_log_get()); + EXPECT_SUBSTR("xmit DATA 1400@0", unit_log_get()); /* Third attempt: core is too busy for fast grants. */ cur_offload_core->last_gro = 600; diff --git a/test/unit_homa_outgoing.c b/test/unit_homa_outgoing.c index efe136ab..9339e1e1 100644 --- a/test/unit_homa_outgoing.c +++ b/test/unit_homa_outgoing.c @@ -115,21 +115,23 @@ FIXTURE_TEARDOWN(homa_outgoing) } #ifndef __STRIP__ /* See strip.py */ -TEST_F(homa_outgoing, set_priority__priority_mapping) +TEST_F(homa_outgoing, homa_message_out_init) { - struct homa_grant_hdr h; struct homa_rpc *srpc; srpc = unit_server_rpc(&self->hsk, UNIT_RCVD_ONE_PKT, self->client_ip, self->server_ip, self->client_port, 1111, 10000, 10000); ASSERT_NE(NULL, srpc); - h.offset = htonl(12345); - h.priority = 4; - EXPECT_EQ(0, homa_xmit_control(GRANT, &h, sizeof(h), srpc)); - self->homa.priority_map[7] = 3; - EXPECT_EQ(0, homa_xmit_control(GRANT, &h, sizeof(h), srpc)); - EXPECT_STREQ("7 3", mock_xmit_prios); + /* First call: message is scheduled. */ + self->homa.unsched_bytes = 10000; + homa_message_out_init(srpc, 10001); + EXPECT_EQ(0, srpc->msgout.granted); + + /* Second call: message is unscheduled. */ + homa_message_out_init(srpc, 10000); + EXPECT_EQ(10000, srpc->msgout.granted); + EXPECT_EQ(6, srpc->msgout.priority); } #endif /* See strip.py */ @@ -380,7 +382,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__basics) ASSERT_FALSE(crpc == NULL); ASSERT_EQ(0, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 3000), 0)); + unit_iov_iter((void *) 1000, 3000))); homa_rpc_unlock(crpc); #ifndef __STRIP__ /* See strip.py */ EXPECT_EQ(3000, crpc->msgout.granted); @@ -413,8 +415,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__message_too_long) ASSERT_FALSE(crpc == NULL); EXPECT_EQ(EINVAL, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, HOMA_MAX_MESSAGE_LENGTH+1), - 0)); + unit_iov_iter((void *) 1000, HOMA_MAX_MESSAGE_LENGTH+1))); EXPECT_STREQ("message length exceeded HOMA_MAX_MESSAGE_LENGTH", self->hsk.error_msg); homa_rpc_unlock(crpc); @@ -428,7 +429,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__zero_length_message) ASSERT_FALSE(crpc == NULL); EXPECT_EQ(EINVAL, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 0), 0)); + unit_iov_iter((void *) 1000, 0))); homa_rpc_unlock(crpc); } #ifndef __STRIP__ /* See strip.py */ @@ -452,7 +453,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__gso_geometry_hijacking) 2 * UNIT_TEST_DATA_PER_PACKET; homa_rpc_lock(crpc1); ASSERT_EQ(0, -homa_message_out_fill(crpc1, - unit_iov_iter((void *) 1000, 10000), 0)); + unit_iov_iter((void *) 1000, 10000))); homa_rpc_unlock(crpc1); EXPECT_SUBSTR("max_seg_data 1400, max_gso_data 2800", unit_log_get()); @@ -461,7 +462,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__gso_geometry_hijacking) unit_log_clear(); homa_rpc_lock(crpc2); ASSERT_EQ(0, -homa_message_out_fill(crpc2, - unit_iov_iter((void *) 1000, 10000), 0)); + unit_iov_iter((void *) 1000, 10000))); homa_rpc_unlock(crpc2); EXPECT_SUBSTR("max_seg_data 1400, max_gso_data 4200", unit_log_get()); } @@ -479,7 +480,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__gso_geometry_no_hijacking) 2 * (UNIT_TEST_DATA_PER_PACKET + sizeof(struct homa_seg_hdr)); ASSERT_EQ(0, -homa_message_out_fill(crpc1, - unit_iov_iter((void *) 1000, 10000), 0)); + unit_iov_iter((void *) 1000, 10000))); homa_rpc_unlock(crpc1); EXPECT_SUBSTR("max_seg_data 1400, max_gso_data 2800", unit_log_get()); @@ -489,7 +490,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__gso_geometry_no_hijacking) self->dev->gso_max_size += 1; unit_log_clear(); ASSERT_EQ(0, -homa_message_out_fill(crpc2, - unit_iov_iter((void *) 1000, 10000), 0)); + unit_iov_iter((void *) 1000, 10000))); homa_rpc_unlock(crpc2); EXPECT_SUBSTR("max_seg_data 1400, max_gso_data 4200", unit_log_get()); } @@ -503,29 +504,10 @@ TEST_F(homa_outgoing, homa_message_out_fill__gso_limit_less_than_mtu) self->dev->gso_max_size = 10000; self->homa.max_gso_size = 1000; ASSERT_EQ(0, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 5000), 0)); + unit_iov_iter((void *) 1000, 5000))); homa_rpc_unlock(crpc); EXPECT_SUBSTR("max_seg_data 1400, max_gso_data 1400;", unit_log_get()); } -#ifndef __STRIP__ /* See strip.py */ -TEST_F(homa_outgoing, homa_message_out_fill__disable_overlap_xmit_because_of_homa_qdisc) -{ - struct homa_qdisc_dev *qdev; - struct homa_rpc *crpc; - - qdev = homa_qdisc_qdev_get(self->dev); - crpc = homa_rpc_alloc_client(&self->hsk, &self->server_addr); - - ASSERT_FALSE(crpc == NULL); - ASSERT_EQ(0, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 5000), 1)); - homa_rpc_unlock(crpc); - unit_log_clear(); - unit_log_throttled(&self->homa); - EXPECT_STREQ("", unit_log_get()); - homa_qdisc_qdev_put(qdev); -} -#endif /* See strip.py */ TEST_F(homa_outgoing, homa_message_out_fill__multiple_segs_per_skbuff) { struct homa_rpc *crpc = homa_rpc_alloc_client(&self->hsk, @@ -535,7 +517,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__multiple_segs_per_skbuff) self->dev->gso_max_size = 5000; unit_log_clear(); ASSERT_EQ(0, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 10000), 0)); + unit_iov_iter((void *) 1000, 10000))); homa_rpc_unlock(crpc); EXPECT_SUBSTR("_copy_from_iter 1400 bytes at 1000; " "_copy_from_iter 1400 bytes at 2400; " @@ -564,7 +546,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__error_in_homa_tx_data_packet_alloc) mock_copy_data_errors = 2; EXPECT_EQ(EFAULT, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 3000), 0)); + unit_iov_iter((void *) 1000, 3000))); EXPECT_STREQ("couldn't copy message body into packet buffers", self->hsk.error_msg); homa_rpc_unlock(crpc); @@ -583,7 +565,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__rpc_freed_during_copy) unit_hook_register(unlock_hook); hook_rpc = crpc; ASSERT_EQ(EINVAL, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 3000), 0)); + unit_iov_iter((void *) 1000, 3000))); EXPECT_STREQ("rpc deleted while creating outgoing message", self->hsk.error_msg); EXPECT_EQ(0, crpc->msgout.num_skbs); EXPECT_EQ(RPC_DEAD, crpc->state); @@ -591,25 +573,6 @@ TEST_F(homa_outgoing, homa_message_out_fill__rpc_freed_during_copy) EXPECT_EQ(1, refcount_read(&self->hsk.sock.sk_wmem_alloc)); homa_rpc_unlock(crpc); } -#ifndef __STRIP__ /* See strip.py */ -TEST_F(homa_outgoing, homa_message_out_fill__xmit_packets) -{ - struct homa_rpc *crpc = homa_rpc_alloc_client(&self->hsk, - &self->server_addr); - - ASSERT_FALSE(crpc == NULL); - self->homa.unsched_bytes = 2800; - ASSERT_EQ(0, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 5000), 1)); - homa_rpc_unlock(crpc); - EXPECT_SUBSTR(" _copy_from_iter 1400 bytes at 1000; " - "xmit DATA 1400@0; " - "_copy_from_iter 1400 bytes at 2400; " - "xmit DATA 1400@1400; " - "_copy_from_iter 1400 bytes at 3800; " - "_copy_from_iter 800 bytes at 5200", unit_log_get()); -} -#endif /* See strip.py */ TEST_F(homa_outgoing, homa_message_out_fill__packet_memory_accounting) { struct homa_rpc *crpc = homa_rpc_alloc_client(&self->hsk, @@ -619,7 +582,7 @@ TEST_F(homa_outgoing, homa_message_out_fill__packet_memory_accounting) ASSERT_FALSE(crpc == NULL); ASSERT_EQ(0, -homa_message_out_fill(crpc, - unit_iov_iter((void *) 1000, 3000), 0)); + unit_iov_iter((void *) 1000, 3000))); homa_rpc_unlock(crpc); unit_log_clear(); EXPECT_EQ(3, crpc->msgout.num_skbs); @@ -764,6 +727,20 @@ TEST_F(homa_outgoing, homa_xmit_unknown) unit_log_get()); kfree_skb(skb); } + +TEST_F(homa_outgoing, homa_xmit_grant_request) +{ + struct homa_rpc *crpc = unit_client_rpc(&self->hsk, + UNIT_OUTGOING, self->client_ip, self->server_ip, + self->server_port, self->client_id, 6000, 1000); + + mock_xmit_log_verbose = 1; + unit_log_clear(); + homa_xmit_grant_request(crpc, 2500); + EXPECT_STREQ("xmit DATA from 0.0.0.0:40000, dport 99, id 1234, " + "message_length 2500, offset 0, data_length 0, incoming 0", + unit_log_get()); +} #endif /* See strip.py */ TEST_F(homa_outgoing, homa_xmit_data__basics) @@ -773,8 +750,7 @@ TEST_F(homa_outgoing, homa_xmit_data__basics) self->server_port, self->client_id, 6000, 1000); #ifndef __STRIP__ /* See strip.py */ - crpc->msgout.sched_priority = 2; - crpc->msgout.unscheduled = 2000; + crpc->msgout.priority = 2; crpc->msgout.granted = 5000; homa_peer_set_cutoffs(crpc->peer, INT_MAX, 0, 0, 0, 0, INT_MAX, 7000, 0); @@ -792,7 +768,7 @@ TEST_F(homa_outgoing, homa_xmit_data__basics) "xmit DATA 1400@1400; " "xmit DATA 1400@2800; " "xmit DATA 1400@4200", unit_log_get()); - EXPECT_STREQ("6 6 2 2", mock_xmit_prios); + EXPECT_STREQ("2 2 2 2", mock_xmit_prios); EXPECT_EQ(5600, crpc->msgout.next_xmit_offset); unit_log_clear(); unit_log_throttled(&self->homa); @@ -962,7 +938,6 @@ TEST_F(homa_outgoing, homa_xmit_data__rpc_freed) self->server_port, self->client_id, 6000, 1000); #ifndef __STRIP__ /* See strip.py */ - crpc->msgout.unscheduled = 2000; crpc->msgout.granted = 5000; #else /* See strip.py */ unit_reset_tx(crpc); @@ -1076,8 +1051,8 @@ TEST_F(homa_outgoing, homa_resend_data__basics) homa_resend_data(crpc, 7000, 10000, 2); #ifndef __STRIP__ /* See strip.py */ EXPECT_STREQ("xmit DATA from 0.0.0.0:40000, dport 99, id 1234, message_length 16000, offset 7000, data_length 1400, incoming 10000, RETRANSMIT; " - "xmit DATA from 0.0.0.0:40000, dport 99, id 1234, message_length 16000, offset 8400, data_length 1400, incoming 10000, RETRANSMIT; " - "xmit DATA from 0.0.0.0:40000, dport 99, id 1234, message_length 16000, offset 9800, data_length 200, incoming 10000, RETRANSMIT", + "xmit DATA from 0.0.0.0:40000, dport 99, id 1234, message_length 16000, offset 8400, data_length 1400, incoming 10000, RETRANSMIT; " + "xmit DATA from 0.0.0.0:40000, dport 99, id 1234, message_length 16000, offset 9800, data_length 1400, incoming 10000, RETRANSMIT", unit_log_get()); #else /* See strip.py */ EXPECT_STREQ("xmit DATA from 0.0.0.0:40000, dport 99, id 1234, message_length 16000, offset 7000, data_length 1400, RETRANSMIT; " @@ -1155,7 +1130,6 @@ TEST_F(homa_outgoing, homa_resend_data__set_incoming) 16000, 1000); unit_log_clear(); mock_xmit_log_verbose = 1; - EXPECT_EQ(10000, crpc->msgout.granted); homa_resend_data(crpc, 8400, 8800, 2); EXPECT_SUBSTR("incoming 8800", unit_log_get()); } @@ -1180,6 +1154,7 @@ TEST_F(homa_outgoing, homa_resend_data__update_to_free_and_set_homa_info) struct homa_skb_info *homa_info; struct homa_rpc *crpc; struct sk_buff *skb; + int num_skbs; mock_set_ipv6(&self->hsk); self->dev->gso_max_size = 5000; @@ -1187,6 +1162,7 @@ TEST_F(homa_outgoing, homa_resend_data__update_to_free_and_set_homa_info) self->server_ip, self->server_port, self->client_id, 16000, 1000); unit_log_clear(); + num_skbs = crpc->msgout.num_skbs; homa_resend_data(crpc, 8400, 8800, 2); skb = crpc->msgout.to_free; ASSERT_NE(NULL, skb); @@ -1198,7 +1174,7 @@ TEST_F(homa_outgoing, homa_resend_data__update_to_free_and_set_homa_info) EXPECT_EQ(8400, homa_info->offset); EXPECT_EQ(crpc, homa_info->rpc); EXPECT_EQ(1, refcount_read(&skb->users)); - IF_NO_STRIP(EXPECT_EQ(6, crpc->msgout.num_skbs)); + IF_NO_STRIP(EXPECT_EQ(num_skbs + 1, crpc->msgout.num_skbs)); } TEST_F(homa_outgoing, homa_rpc_tx_end) diff --git a/test/unit_homa_plumbing.c b/test/unit_homa_plumbing.c index 8c1a292a..196d9bc2 100644 --- a/test/unit_homa_plumbing.c +++ b/test/unit_homa_plumbing.c @@ -830,6 +830,14 @@ TEST_F(homa_plumbing, homa_sendmsg__request_sent_successfully) EXPECT_EQ(88888, crpc->completion_cookie); homa_rpc_unlock(crpc); } +TEST_F(homa_plumbing, homa_sendmsg__request_grants_for_scheduled_message) +{ + self->homa.unsched_bytes = 100; + EXPECT_EQ(0, -homa_sendmsg(&self->hsk.inet.sk, + &self->sendmsg_hdr, self->sendmsg_hdr.msg_iter.count)); + EXPECT_SUBSTR("xmit DATA 0@0", unit_log_get()); + ASSERT_EQ(1, unit_list_length(&self->hsk.active_rpcs)); +} #ifndef __STRIP__ /* See strip.py */ TEST_F(homa_plumbing, homa_sendmsg__request_metrics) { @@ -895,6 +903,18 @@ TEST_F(homa_plumbing, homa_sendmsg__response_wrong_state) EXPECT_EQ(RPC_INCOMING, srpc->state); EXPECT_EQ(1, unit_list_length(&self->hsk.active_rpcs)); } +TEST_F(homa_plumbing, homa_sendmsg__request_grants_for_scheduled_response) +{ + unit_server_rpc(&self->hsk, UNIT_IN_SERVICE, self->client_ip, + self->server_ip, self->client_port, self->server_id, + 2000, 100); + self->sendmsg_args.id = self->server_id; + self->homa.unsched_bytes = 100; + EXPECT_EQ(0, -homa_sendmsg(&self->hsk.inet.sk, + &self->sendmsg_hdr, self->sendmsg_hdr.msg_iter.count)); + EXPECT_SUBSTR("xmit DATA 0@0", unit_log_get()); + EXPECT_EQ(1, unit_list_length(&self->hsk.active_rpcs)); +} TEST_F(homa_plumbing, homa_sendmsg__homa_message_out_fill_returns_error) { struct homa_rpc *srpc = unit_server_rpc(&self->hsk, UNIT_IN_SERVICE, @@ -934,6 +954,7 @@ TEST_F(homa_plumbing, homa_sendmsg__response_succeeds) self->sendmsg_args.id = self->server_id; EXPECT_EQ(0, -homa_sendmsg(&self->hsk.inet.sk, &self->sendmsg_hdr, self->sendmsg_hdr.msg_iter.count)); + EXPECT_SUBSTR("xmit DATA 200@0", unit_log_get()); EXPECT_EQ(RPC_OUTGOING, srpc->state); EXPECT_EQ(1, unit_list_length(&self->hsk.active_rpcs)); } diff --git a/test/unit_homa_qdisc.c b/test/unit_homa_qdisc.c index 5126f0d2..15021e43 100644 --- a/test/unit_homa_qdisc.c +++ b/test/unit_homa_qdisc.c @@ -38,8 +38,11 @@ static struct sk_buff *new_test_skb(struct homa_rpc *rpc, }; data.message_length = htonl(rpc->msgout.length); data.seg.offset = htonl(offset); - skb = mock_skb_alloc(saddr, &data.common, - length + sizeof(struct homa_skb_info), 0); + skb = mock_raw_skb(saddr, IPPROTO_HOMA, sizeof(data) + length + + sizeof(*info)); + memcpy(skb_put(skb, sizeof(data)), &data, sizeof(data)); + if (length != 0) + unit_fill_data(skb_put(skb, length), length, 0); info = homa_get_skb_info(skb); info->rpc = rpc; info->data_bytes = length; @@ -607,6 +610,30 @@ TEST_F(homa_qdisc, homa_qdisc_enqueue__defer_tcp_packet_because_of_link_idle_tim EXPECT_EQ(1, skb_queue_len(&q->deferred_tcp)); EXPECT_EQ(1000000, atomic64_read(&q->qdev->link_idle_time)); } +TEST_F(homa_qdisc, homa_qdisc_enqueue__empty_data_packet) +{ + struct homa_qdisc *q = init_qdisc(self->qdiscs[3]); + struct sk_buff *skb, *to_free; + struct homa_rpc *srpc; + + srpc = unit_server_rpc(&self->hsk, UNIT_OUTGOING, &self->client_ip, + &self->server_ip, self->client_port, + self->server_id, 100, 20000); + ASSERT_NE(NULL, srpc); + + atomic64_set(&q->qdev->link_idle_time, 1000000); + skb = new_test_skb(srpc, &self->addr, 0, 0); + to_free = NULL; + unit_log_clear(); + + EXPECT_EQ(NET_XMIT_SUCCESS, + homa_qdisc_enqueue(skb, q->qdisc, &to_free)); + EXPECT_EQ(NULL, to_free); + EXPECT_FALSE(homa_qdisc_any_deferred(q->qdev)); + EXPECT_EQ(1, q->qdisc->q.qlen); + EXPECT_STREQ("", unit_log_get()); + EXPECT_LT(1000000, atomic64_read(&q->qdev->link_idle_time)); +} TEST_F(homa_qdisc, homa_qdisc_enqueue__short_homa_message) { struct homa_qdisc *q = init_qdisc(self->qdiscs[3]); diff --git a/test/unit_homa_rpc.c b/test/unit_homa_rpc.c index 9cd00841..4623a731 100644 --- a/test/unit_homa_rpc.c +++ b/test/unit_homa_rpc.c @@ -942,6 +942,7 @@ TEST_F(homa_rpc, homa_rpc_get_info__basics) struct homa_rpc_info info; crpc->completion_cookie = 1111; + crpc->msgout.priority = 4; homa_rpc_get_info(crpc, &info); EXPECT_EQ(AF_INET6, info.peer.in6.sin6_family); @@ -955,7 +956,7 @@ TEST_F(homa_rpc, homa_rpc_get_info__basics) EXPECT_EQ(1000, info.tx_length); EXPECT_EQ(1000, info.tx_sent); EXPECT_EQ(1000, info.tx_granted); - IF_NO_STRIP(EXPECT_EQ(0, info.tx_prio)); + IF_NO_STRIP(EXPECT_EQ(4, info.tx_prio)); EXPECT_EQ(20000, info.rx_length); EXPECT_EQ(18600, info.rx_remaining); EXPECT_EQ(0, info.rx_gaps); @@ -986,7 +987,7 @@ TEST_F(homa_rpc, homa_rpc_get_info__tx_incomplete) crpc->msgout.granted = 4000; crpc->msgout.next_xmit_offset = 1400; - crpc->msgout.sched_priority = 5; + crpc->msgout.priority = 5; homa_rpc_get_info(crpc, &info); EXPECT_EQ(5000, info.tx_length); diff --git a/test/unit_homa_timer.c b/test/unit_homa_timer.c index d578c32b..b0a66935 100644 --- a/test/unit_homa_timer.c +++ b/test/unit_homa_timer.c @@ -282,7 +282,7 @@ TEST_F(homa_timer, homa_timer__reap_dead_rpcs) ASSERT_NE(NULL, dead); homa_rpc_end(dead); #ifndef __STRIP__ /* See strip.py */ - EXPECT_EQ(31, self->hsk.dead_skbs); + EXPECT_EQ(30, self->hsk.dead_skbs); #else /* See strip.py */ EXPECT_EQ(30, self->hsk.dead_skbs); #endif /* See strip.py */ @@ -291,7 +291,7 @@ TEST_F(homa_timer, homa_timer__reap_dead_rpcs) self->homa.dead_buffs_limit = 32; homa_timer(&self->homa); #ifndef __STRIP__ /* See strip.py */ - EXPECT_EQ(31, self->hsk.dead_skbs); + EXPECT_EQ(30, self->hsk.dead_skbs); #else /* See strip.py */ EXPECT_EQ(30, self->hsk.dead_skbs); #endif /* See strip.py */ @@ -300,7 +300,7 @@ TEST_F(homa_timer, homa_timer__reap_dead_rpcs) self->homa.dead_buffs_limit = 15; homa_timer(&self->homa); #ifndef __STRIP__ /* See strip.py */ - EXPECT_EQ(11, self->hsk.dead_skbs); + EXPECT_EQ(10, self->hsk.dead_skbs); #else /* See strip.py */ EXPECT_EQ(10, self->hsk.dead_skbs); #endif /* See strip.py */ diff --git a/test/utils.c b/test/utils.c index 7193ad03..bdb94dd2 100644 --- a/test/utils.c +++ b/test/utils.c @@ -51,7 +51,7 @@ struct homa_rpc *unit_client_rpc(struct homa_sock *hsk, crpc = homa_rpc_alloc_client(hsk, &server_addr); if (IS_ERR(crpc)) return NULL; - if (homa_message_out_fill(crpc, unit_iov_iter(NULL, req_length), 0)) { + if (homa_message_out_fill(crpc, unit_iov_iter(NULL, req_length))) { homa_rpc_end(crpc); return NULL; } @@ -427,7 +427,7 @@ struct homa_rpc *unit_server_rpc(struct homa_sock *hsk, return srpc; homa_rpc_lock(srpc); status = homa_message_out_fill(srpc, unit_iov_iter((void *) 2000, - resp_length), 0); + resp_length)); homa_rpc_unlock(srpc); if (status != 0) goto error; From 9850731d7a0def67dfdcbc5210cf29955520194c Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Thu, 22 Jan 2026 19:45:55 -0800 Subject: [PATCH 4/4] More WIP, very unbaked right now. --- homa_devel.c | 23 +++++++++++++++++++++ homa_impl.h | 2 +- homa_incoming.c | 43 +++++++++++++++++++++++---------------- homa_outgoing.c | 12 +++++------ homa_plumbing.c | 13 ++++++++---- homa_rpc.c | 10 ++++----- homa_rpc.h | 4 ++-- homa_wire.h | 38 +++++++++++++++++++++++++++------- notes.txt | 5 ++++- test/mock.c | 5 +++++ test/unit_homa_outgoing.c | 7 +++---- test/unit_homa_plumbing.c | 4 ++-- timetrace.c | 2 +- 13 files changed, 116 insertions(+), 52 deletions(-) diff --git a/homa_devel.c b/homa_devel.c index 272aede9..55f6b384 100644 --- a/homa_devel.c +++ b/homa_devel.c @@ -284,6 +284,16 @@ char *homa_print_packet(struct sk_buff *skb, char *buffer, int buf_len) } break; } +#ifndef __STRIP__ /* See strip.py */ + case NEED_GRANT: { + struct homa_need_grant_hdr *h; + + h = (struct homa_need_grant_hdr *)header; + used = homa_snprintf(buffer, buf_len, used, ", length %d", + ntohl(h->length)); + break; + } +#endif /* See strip.py */ } buffer[buf_len - 1] = 0; @@ -386,6 +396,15 @@ char *homa_print_packet_short(struct sk_buff *skb, char *buffer, int buf_len) case ACK: snprintf(buffer, buf_len, "ACK"); break; +#ifndef __STRIP__ /* See strip.py */ + case NEED_GRANT: { + struct homa_need_grant_hdr *h; + + h = (struct homa_need_grant_hdr *)header; + snprintf(buffer, buf_len, "NEED_GRANT %d", ntohl(h->length)); + break; + } +#endif /* See strip.py */ default: snprintf(buffer, buf_len, "unknown packet type 0x%x", common->type); @@ -548,6 +567,10 @@ char *homa_symbol_for_type(uint8_t type) return "NEED_ACK"; case ACK: return "ACK"; +#ifndef __STRIP__ /* See strip.py */ + case NEED_GRANT: + return "NEED_GRANT"; +#endif /* See strip.py */ } return "??"; } diff --git a/homa_impl.h b/homa_impl.h index 692fa226..4154146c 100644 --- a/homa_impl.h +++ b/homa_impl.h @@ -787,7 +787,7 @@ int homa_unsched_priority(struct homa *homa, struct homa_peer *peer, void homa_xmit_data(struct homa_rpc *rpc, bool force); void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc, int priority); -void homa_xmit_grant_request(struct homa_rpc *rpc, int length); +void homa_xmit_need_grant(struct homa_rpc *rpc, int length); #else /* See strip.py */ int homa_message_in_init(struct homa_rpc *rpc, int unsched); void homa_resend_data(struct homa_rpc *rpc, int start, int end); diff --git a/homa_incoming.c b/homa_incoming.c index 912c0ac9..c205fd8e 100644 --- a/homa_incoming.c +++ b/homa_incoming.c @@ -450,9 +450,9 @@ void homa_dispatch_pkts(struct sk_buff *skb) #define MAX_ACKS 10 #endif /* __UNIT_TEST__ */ const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); - struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data; - u64 id = homa_local_id(h->common.sender_id); - int dport = ntohs(h->common.dport); + struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data; + u64 id = homa_local_id(h->sender_id); + int dport = ntohs(h->dport); struct homa_rpc *rpc = NULL; struct homa_sock *hsk; struct homa_net *hnet; @@ -469,8 +469,8 @@ void homa_dispatch_pkts(struct sk_buff *skb) icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0); tt_record3("Discarding packet(s) for unknown port %u, id %llu, type %d", - dport, homa_local_id(h->common.sender_id), - h->common.type); + dport, homa_local_id(h->sender_id), + h->type); while (skb) { next = skb->next; kfree_skb(skb); @@ -515,7 +515,8 @@ void homa_dispatch_pkts(struct sk_buff *skb) if (!rpc) { if (!homa_is_client(id)) { /* We are the server for this RPC. */ - if (h->common.type == DATA) { + if (h->type == DATA || + h->type == NEED_GRANT) { int created; /* Create a new RPC if one doesn't @@ -541,34 +542,34 @@ void homa_dispatch_pkts(struct sk_buff *skb) } if (unlikely(!rpc)) { #ifndef __STRIP__ /* See strip.py */ - if (h->common.type != CUTOFFS && - h->common.type != NEED_ACK && + if (h->type != CUTOFFS && + h->type != NEED_ACK && #else /* See strip.py */ - if (h->common.type != NEED_ACK && + if (h->type != NEED_ACK && #endif /* See strip.py */ - h->common.type != ACK && - h->common.type != RESEND) { + h->type != ACK && + h->type != RESEND) { tt_record4("Discarding packet for unknown RPC, id %u, type %d, peer 0x%x:%d", - id, h->common.type, tt_addr(saddr), - ntohs(h->common.sport)); + id, h->type, tt_addr(saddr), + ntohs(h->sport)); #ifndef __STRIP__ /* See strip.py */ - if (h->common.type != GRANT || + if (h->type != GRANT || homa_is_client(id)) INC_METRIC(unknown_rpcs, 1); #endif /* See strip.py */ goto discard; } } else { - if (h->common.type == DATA || + if (h->type == DATA || #ifndef __STRIP__ /* See strip.py */ - h->common.type == GRANT || + h->type == GRANT || #endif /* See strip.py */ - h->common.type == BUSY) + h->type == BUSY) rpc->silent_ticks = 0; rpc->peer->outstanding_resends = 0; } - switch (h->common.type) { + switch (h->type) { case DATA: homa_data_pkt(skb, rpc); INC_METRIC(packets_received[DATA - DATA], 1); @@ -609,6 +610,12 @@ void homa_dispatch_pkts(struct sk_buff *skb) INC_METRIC(packets_received[ACK - DATA], 1); homa_ack_pkt(skb, hsk, rpc); break; +#ifndef __STRIP__ /* See strip.py */ + case NEED_GRANT: + INC_METRIC(packets_received[NEED_GRANT - DATA], 1); + homa_need_grant_pkt(skb, rpc); + break; +#endif /* See strip.py */ default: INC_METRIC(unknown_packet_types, 1); goto discard; diff --git a/homa_outgoing.c b/homa_outgoing.c index 291a1ca8..f1093327 100644 --- a/homa_outgoing.c +++ b/homa_outgoing.c @@ -505,20 +505,18 @@ void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk) #ifndef __STRIP__ /* See strip.py */ /** - * homa_xmit_grant_request() - Send an initial empty data packet for an outgoing + * homa_xmit_need_grant() - Send an initial empty data packet for an outgoing * RPC so that the peer will (eventually) send grants for the RPC. * @rpc: RPC for which grants are needed: must not be an unscheduled RPC. * @length: Number of bytes in outgoing message. */ -void homa_xmit_grant_request(struct homa_rpc *rpc, int length) +void homa_xmit_need_grant(struct homa_rpc *rpc, int length) { - struct homa_data_hdr h; + struct homa_need_grant_hdr h; memset(&h, 0, sizeof(h)); - h.message_length = htonl(length); - h.incoming = 0; - IF_NO_STRIP(h.cutoff_version = rpc->peer->cutoff_version); - homa_xmit_control(DATA, &h, sizeof(h), rpc); + h.length = htonl(length); + homa_xmit_control(NEED_GRANT, &h, sizeof(h), rpc); } /** diff --git a/homa_plumbing.c b/homa_plumbing.c index 7760d7cf..5dc3091b 100644 --- a/homa_plumbing.c +++ b/homa_plumbing.c @@ -426,7 +426,8 @@ static u16 header_lengths[] = { sizeof(struct homa_cutoffs_hdr), sizeof(struct homa_freeze_hdr), sizeof(struct homa_need_ack_hdr), - sizeof(struct homa_ack_hdr) + sizeof(struct homa_ack_hdr), + sizeof(struct homa_need_grant_hdr) }; #else /* See strip.py */ static u16 header_lengths[] = { @@ -438,7 +439,8 @@ static u16 header_lengths[] = { 0, 0, sizeof(struct homa_need_ack_hdr), - sizeof(struct homa_ack_hdr) + sizeof(struct homa_ack_hdr), + 0 }; #endif /* See strip.py */ @@ -497,6 +499,9 @@ int __init homa_load(void) #endif /* See strip.py */ BUILD_BUG_ON(sizeof(struct homa_need_ack_hdr) > HOMA_MAX_HEADER); BUILD_BUG_ON(sizeof(struct homa_ack_hdr) > HOMA_MAX_HEADER); +#ifndef __STRIP__ /* See strip.py */ + BUILD_BUG_ON(sizeof(struct homa_need_grant_hdr) > HOMA_MAX_HEADER); +#endif /* See strip.py */ /* Extra constraints on data packets: * - Ensure minimum header length so Homa doesn't have to worry about @@ -1218,7 +1223,7 @@ int homa_sendmsg(struct sock *sk, struct msghdr *msg, size_t length) rpc->completion_cookie = args.completion_cookie; #ifndef __STRIP__ /* See strip.py */ if (msg->msg_iter.count > hsk->homa->unsched_bytes) - homa_xmit_grant_request(rpc, msg->msg_iter.count); + homa_xmit_need_grant(rpc, msg->msg_iter.count); #endif /* See strip.py */ result = homa_message_out_fill(rpc, &msg->msg_iter); if (result) @@ -1282,7 +1287,7 @@ int homa_sendmsg(struct sock *sk, struct msghdr *msg, size_t length) #ifndef __STRIP__ /* See strip.py */ if (msg->msg_iter.count > hsk->homa->unsched_bytes) - homa_xmit_grant_request(rpc, msg->msg_iter.count); + homa_xmit_need_grant(rpc, msg->msg_iter.count); #endif /* See strip.py */ result = homa_message_out_fill(rpc, &msg->msg_iter); if (result && rpc->state != RPC_DEAD) diff --git a/homa_rpc.c b/homa_rpc.c index df683081..0224b6f3 100644 --- a/homa_rpc.c +++ b/homa_rpc.c @@ -107,7 +107,7 @@ struct homa_rpc *homa_rpc_alloc_client(struct homa_sock *hsk, * @hsk: Socket that owns this RPC. * @source: IP address (network byte order) of the RPC's client. * @h: Header for the first data packet received for this RPC; used - * to initialize the RPC. + * to initialize the RPC. Must have type DATA or NEED_GRANT. * @created: Will be set to 1 if a new RPC was created and 0 if an * existing RPC was found. * @@ -117,10 +117,10 @@ struct homa_rpc *homa_rpc_alloc_client(struct homa_sock *hsk, */ struct homa_rpc *homa_rpc_alloc_server(struct homa_sock *hsk, const struct in6_addr *source, - struct homa_data_hdr *h, int *created) + struct homa_common_hdr *h, int *created) __cond_acquires(srpc->bucket->lock) { - u64 id = homa_local_id(h->common.sender_id); + u64 id = homa_local_id(h->sender_id); struct homa_rpc_bucket *bucket; struct homa_rpc *srpc = NULL; int err; @@ -135,7 +135,7 @@ struct homa_rpc *homa_rpc_alloc_server(struct homa_sock *hsk, homa_bucket_lock(bucket, id); hlist_for_each_entry(srpc, &bucket->rpcs, hash_links) { if (srpc->id == id && - srpc->dport == ntohs(h->common.sport) && + srpc->dport == ntohs(h->sport) && ipv6_addr_equal(&srpc->peer->addr, source)) { /* RPC already exists; just return it instead * of creating a new RPC. @@ -161,7 +161,7 @@ struct homa_rpc *homa_rpc_alloc_server(struct homa_sock *hsk, srpc->peer = NULL; goto error; } - srpc->dport = ntohs(h->common.sport); + srpc->dport = ntohs(h->sport); srpc->id = id; srpc->msgin.length = -1; srpc->msgout.length = -1; diff --git a/homa_rpc.h b/homa_rpc.h index e886fd41..c79eb221 100644 --- a/homa_rpc.h +++ b/homa_rpc.h @@ -183,7 +183,7 @@ struct homa_message_in { /** * @rank: Position of this RPC in homa->grant->active_rpcs, or -1 * if not in homa->grant->active_rpcs. Managed by homa_grant.c; - * unsafe to access unless holding homa->grant->lock. + * unsafe to modify unless holding homa->grant->lock. */ int rank; @@ -196,7 +196,7 @@ struct homa_message_in { /** * @prev_grant: Offset in the last GRANT packet sent for this RPC - * (initially set to unscheduled bytes). Managed by homa_grant.c. + * (initially 0). */ int prev_grant; diff --git a/homa_wire.h b/homa_wire.h index 948be506..c0e1d6d2 100644 --- a/homa_wire.h +++ b/homa_wire.h @@ -35,12 +35,16 @@ enum homa_packet_type { #endif /* See strip.py */ NEED_ACK = 0x17, ACK = 0x18, - MAX_OP = 0x18, +#ifndef __STRIP__ /* See strip.py */ + NEED_GRANT = 0x19, +#endif /* See strip.py */ + MAX_OP = 0x19, /* If you add a new type here, you must also do the following: * 1. Change MAX_OP so it is the highest valid opcode * 2. Add support for the new opcode in homa_print_packet, * homa_print_packet_short, homa_symbol_for_type, and mock_skb_alloc. * 3. Add the header length to header_lengths in homa_plumbing.c. + * 4. Add length validation for the header in homa_load. */ }; @@ -385,17 +389,16 @@ struct homa_grant_hdr { struct homa_common_hdr common; /** - * @offset: Byte offset within the message. - * - * The sender should now transmit all data up to (but not including) - * this offset ASAP, if it hasn't already. + * @offset: Byte offset within the message. The sender should now + * transmit all data packets with starting offset less than this + * that haven't already been transmitted. */ __be32 offset; /** * @priority: The sender should use this priority level for all future - * MESSAGE_FRAG packets for this message, until a GRANT is received - * with higher offset. Larger numbers indicate higher priorities. + * DATA packets for this message, until a GRANT is received with higher + * offset. Larger numbers indicate higher priorities. */ u8 priority; } __packed; @@ -532,6 +535,27 @@ struct homa_ack_hdr { struct homa_ack acks[HOMA_MAX_ACKS_PER_PKT]; } __packed; +#ifndef __STRIP__ /* See strip.py */ +/** + * struct homa_need_grant_hdr - Wire format for NEED_GRANT packets. + * + * A NEED_GRANT packet is sent to begin the transmission of a scheduled + * message, so that the receiver knows that the message exists and can begin + * sending GRANTs for the message. The sender will not send DATA packets + * until it receives GRANTs. NEED_GRANT packets are not used for unscheduled + * messages: the sender sends DATA packets without waiting for GRANTs. + */ +struct homa_need_grant_hdr { + /** @common: Fields common to all packet types. */ + struct homa_common_hdr common; + + /** + * @length: Length of the message, in bytes. + */ + __be32 length; +} __packed; +#endif /* See strip.py */ + /** * homa_local_id(): given an RPC identifier from an input packet (which * is network-encoded), return the decoded id we should use for that diff --git a/notes.txt b/notes.txt index bc47df1f..1732ed41 100755 --- a/notes.txt +++ b/notes.txt @@ -5,9 +5,12 @@ Notes for Homa implementation in Linux: * Replace homa_data_hdr with a boolean @granted? Or eliminate completely (receiver can figure out whether message is unscheduled)? * Update tthoma.py (e.g. unscheduled) - * What happens if probe packet is lost? Retransmit probe? + * What happens if probe packet is lost? Retransmit probe? Must modify + homa_request_retrans. May also need to modify homa_rpc_unknown_pkt * Update homa_prio.cc * Compile with __STRIP__ + * Double-check the calculation of incoming in homa_rpc_alloc_server. + * Reduce tt_record calls in homa_grant.c? * (12/12/25) Something is wrong with the xl170 cluster: both Homa and TCP are showing considerably worse performance than previously. I tried multiple diff --git a/test/mock.c b/test/mock.c index 721390ee..fed2324c 100644 --- a/test/mock.c +++ b/test/mock.c @@ -2249,6 +2249,11 @@ struct sk_buff *mock_skb_alloc(struct in6_addr *saddr, case ACK: header_size = sizeof(struct homa_ack_hdr); break; +#ifndef __STRIP__ /* See strip.py */ + case NEED_GRANT: + header_size = sizeof(struct homa_need_grant_hdr); + break; +#endif /* See strip.py */ default: header_size = sizeof(struct homa_common_hdr); break; diff --git a/test/unit_homa_outgoing.c b/test/unit_homa_outgoing.c index 9339e1e1..8f8e7fad 100644 --- a/test/unit_homa_outgoing.c +++ b/test/unit_homa_outgoing.c @@ -728,7 +728,7 @@ TEST_F(homa_outgoing, homa_xmit_unknown) kfree_skb(skb); } -TEST_F(homa_outgoing, homa_xmit_grant_request) +TEST_F(homa_outgoing, homa_xmit_need_grant) { struct homa_rpc *crpc = unit_client_rpc(&self->hsk, UNIT_OUTGOING, self->client_ip, self->server_ip, @@ -736,9 +736,8 @@ TEST_F(homa_outgoing, homa_xmit_grant_request) mock_xmit_log_verbose = 1; unit_log_clear(); - homa_xmit_grant_request(crpc, 2500); - EXPECT_STREQ("xmit DATA from 0.0.0.0:40000, dport 99, id 1234, " - "message_length 2500, offset 0, data_length 0, incoming 0", + homa_xmit_need_grant(crpc, 2500); + EXPECT_STREQ("xmit NEED_GRANT from 0.0.0.0:40000, dport 99, id 1234, length 2500", unit_log_get()); } #endif /* See strip.py */ diff --git a/test/unit_homa_plumbing.c b/test/unit_homa_plumbing.c index 196d9bc2..dbb20960 100644 --- a/test/unit_homa_plumbing.c +++ b/test/unit_homa_plumbing.c @@ -835,7 +835,7 @@ TEST_F(homa_plumbing, homa_sendmsg__request_grants_for_scheduled_message) self->homa.unsched_bytes = 100; EXPECT_EQ(0, -homa_sendmsg(&self->hsk.inet.sk, &self->sendmsg_hdr, self->sendmsg_hdr.msg_iter.count)); - EXPECT_SUBSTR("xmit DATA 0@0", unit_log_get()); + EXPECT_SUBSTR("xmit NEED_GRANT 200", unit_log_get()); ASSERT_EQ(1, unit_list_length(&self->hsk.active_rpcs)); } #ifndef __STRIP__ /* See strip.py */ @@ -912,7 +912,7 @@ TEST_F(homa_plumbing, homa_sendmsg__request_grants_for_scheduled_response) self->homa.unsched_bytes = 100; EXPECT_EQ(0, -homa_sendmsg(&self->hsk.inet.sk, &self->sendmsg_hdr, self->sendmsg_hdr.msg_iter.count)); - EXPECT_SUBSTR("xmit DATA 0@0", unit_log_get()); + EXPECT_SUBSTR("xmit NEED_GRANT 200", unit_log_get()); EXPECT_EQ(1, unit_list_length(&self->hsk.active_rpcs)); } TEST_F(homa_plumbing, homa_sendmsg__homa_message_out_fill_returns_error) diff --git a/timetrace.c b/timetrace.c index e196ee48..74d2f463 100644 --- a/timetrace.c +++ b/timetrace.c @@ -12,7 +12,7 @@ * timetrace stubs; we will then connect the timetrace mechanism here with * those stubs to allow the rest of the kernel to log in our buffers. */ -//#define TT_KERNEL 1 +#define TT_KERNEL 1 #endif /* __UNIT_TEST__ */ #ifdef TT_KERNEL extern struct tt_buffer *tt_linux_buffers[];