From 96fb592c57a5b58f6891ef72ae7bd153fd1753f9 Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Mon, 26 Jan 2026 11:15:32 -0800 Subject: [PATCH 1/3] Use homa_qdisc by default in CloudLab config --- cloudlab/bin/config | 1 + 1 file changed, 1 insertion(+) diff --git a/cloudlab/bin/config b/cloudlab/bin/config index 5ccf574f..56642a41 100755 --- a/cloudlab/bin/config +++ b/cloudlab/bin/config @@ -840,6 +840,7 @@ while i < len(sys.argv): config_nic() config_power() config_rps() + config_qdisc() elif arg == "ecn_threshold": if i >= len(sys.argv): raise Exception("No argument provided for 'ecn_threshold' command"); From 483ea798f82d351a056f0da1ba8d50e318675d45 Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Mon, 26 Jan 2026 11:17:45 -0800 Subject: [PATCH 2/3] Rework mechanism for initiating grant management of RPCs Delay starting management of grants for RPCs until homa_grant_check_rpc is invoked. --- homa_grant.c | 42 +++++++++++++++++++------------- homa_pool.c | 3 +-- homa_rpc.h | 5 ++-- test/unit_homa_grant.c | 54 ++++++++++++++++-------------------------- 4 files changed, 49 insertions(+), 55 deletions(-) diff --git a/homa_grant.c b/homa_grant.c index 1ea24a8d..80e90b78 100644 --- a/homa_grant.c +++ b/homa_grant.c @@ -140,7 +140,7 @@ void homa_grant_free(struct homa_grant *grant) /** * homa_grant_init_rpc() - Initialize grant-related information for an - * RPC's incoming message (may add the RPC to grant priority queues). + * RPC's incoming message. * @rpc: RPC being initialized. Grant-related fields in msgin * are assumed to be zero. Must be locked by caller. * @unsched: Number of unscheduled bytes in the incoming message for @rpc. @@ -149,18 +149,10 @@ void homa_grant_init_rpc(struct homa_rpc *rpc, int unsched) __must_hold(rpc->bucket->lock) { rpc->msgin.rank = -1; - if (unsched >= rpc->msgin.length) { - rpc->msgin.granted = rpc->msgin.length; - rpc->msgin.prev_grant = rpc->msgin.granted; - return; - } + if (unsched >= rpc->msgin.length) + unsched = rpc->msgin.length; rpc->msgin.granted = unsched; rpc->msgin.prev_grant = unsched; - if (rpc->msgin.num_bpages != 0) - /* Can't issue grants unless buffer space has been allocated - * for the message. - */ - homa_grant_manage_rpc(rpc); } /** @@ -443,7 +435,8 @@ void homa_grant_insert_grantable(struct homa_rpc *rpc) * homa_grant_manage_rpc() - Insert an RPC into the priority-based data * structures for managing grantable RPCs (active_rpcs or grantable_peers). * Ensures that the RPC will be sent grants as needed. - * @rpc: The RPC to add. Must be locked by caller. + * @rpc: The RPC to add. Must be locked by caller. May already be + * inserted into the grant structures. */ void homa_grant_manage_rpc(struct homa_rpc *rpc) __must_hold(rpc->bucket->lock) @@ -452,10 +445,13 @@ void homa_grant_manage_rpc(struct homa_rpc *rpc) struct homa_rpc *bumped; u64 time = homa_clock(); - BUG_ON(rpc->msgin.rank >= 0 || !list_empty(&rpc->grantable_links)); - homa_grant_lock(grant); + if (rpc->msgin.rank >= 0 || !list_empty(&rpc->grantable_links)) { + homa_grant_unlock(grant); + return; + } + INC_METRIC(grantable_rpcs_integral, grant->num_grantable_rpcs * (time - grant->last_grantable_change)); grant->last_grantable_change = time; @@ -681,9 +677,10 @@ void homa_grant_send(struct homa_rpc *rpc, int priority) /** * homa_grant_check_rpc() - This function is responsible for generating - * grant packets. Is invoked whenever a data packet arrives for RPC; it - * checks the state of that RPC (as well as other RPCs) and generates - * grant packets as appropriate. + * grant packets. It is invoked when the state of an RPC has changed in + * ways that might permit grants to be issued (either to this RPC or other + * RPCs), such as the arrival of a DATA packet. It reviews the state of + * grants and issues grant packets as appropriate. * @rpc: RPC to check. Must be locked by the caller. */ void homa_grant_check_rpc(struct homa_rpc *rpc) @@ -733,6 +730,17 @@ void homa_grant_check_rpc(struct homa_rpc *rpc) rpc->msgin.length); INC_METRIC(grant_check_calls, 1); + /* Races can cause the test below to invoke homa_grant_manage_rpc when + * rpc is already managed, but it will never fail to invoke + * homa_grant_manage_rpc if the RPC is unmanaged. This is an + * optimization to reduce the number of times the grant lock must + * be acquired. + */ + if (rpc->msgin.granted < rpc->msgin.length && + READ_ONCE(rpc->msgin.rank) < 0 && + list_empty(&rpc->grantable_links)) + homa_grant_manage_rpc(rpc); + needy_rank = INT_MAX; now = homa_clock(); homa_grant_update_incoming(rpc, grant); diff --git a/homa_pool.c b/homa_pool.c index 9bc6b33f..64f7b0c5 100644 --- a/homa_pool.c +++ b/homa_pool.c @@ -539,8 +539,7 @@ void homa_pool_check_waiting(struct homa_pool *pool) resend.length = htonl(-1); resend.priority = homa_high_priority(rpc->hsk->homa); homa_xmit_control(RESEND, &resend, sizeof(resend), rpc); - if (rpc->msgin.granted < rpc->msgin.length) - homa_grant_manage_rpc(rpc); + homa_grant_check_rpc(rpc); } #endif /* See strip.py */ homa_rpc_unlock(rpc); diff --git a/homa_rpc.h b/homa_rpc.h index 16750746..bf338a43 100644 --- a/homa_rpc.h +++ b/homa_rpc.h @@ -205,7 +205,7 @@ struct homa_message_in { /** * @prev_grant: Offset in the last GRANT packet sent for this RPC - * (initially set to unscheduled bytes). + * (initially set to unscheduled bytes). Managed by homa_grant.c. */ int prev_grant; @@ -421,8 +421,7 @@ struct homa_rpc { /** * @grantable_links: Used to link this RPC into peer->grantable_rpcs. * If this RPC isn't in peer->grantable_rpcs, this is an empty - * list pointing to itself. Must hold homa->grant->lock when - * accessing. + * list pointing to itself. Must hold homa->grant->lock when accessing. */ struct list_head grantable_links; #endif /* See strip.py */ diff --git a/test/unit_homa_grant.c b/test/unit_homa_grant.c index 150edd99..0b9b590b 100644 --- a/test/unit_homa_grant.c +++ b/test/unit_homa_grant.c @@ -152,6 +152,7 @@ static struct homa_rpc *test_rpc_init(FIXTURE_DATA(homa_grant) *self, self->client_ip, server_ip, self->server_port, id, 1000, size); homa_message_in_init(rpc, size, 0); + homa_grant_manage_rpc(rpc); return rpc; } @@ -200,30 +201,18 @@ TEST_F(homa_grant, homa_grant_free__sysctls_not_registered) EXPECT_STREQ("", unit_log_get()); } -TEST_F(homa_grant, homa_grant_init_rpc__grants_not_needed) +TEST_F(homa_grant, homa_grant_init_rpc__basics) { struct homa_rpc *rpc; rpc= unit_client_rpc(&self->hsk, UNIT_OUTGOING, self->client_ip, self->server_ip, self->server_port, 100, 1000, 20000); - homa_message_in_init(rpc, 2000, 2000); + homa_message_in_init(rpc, 2000, 800); EXPECT_EQ(-1, rpc->msgin.rank); - EXPECT_EQ(2000, rpc->msgin.granted); -} -TEST_F(homa_grant, homa_grant_init_rpc__grants_needed) -{ - struct homa_rpc *rpc; - - rpc= unit_client_rpc(&self->hsk, UNIT_OUTGOING, self->client_ip, - self->server_ip, self->server_port, 100, 1000, - 20000); - - homa_message_in_init(rpc, 5000, 2000); - EXPECT_EQ(0, rpc->msgin.rank); - EXPECT_EQ(2000, rpc->msgin.granted); + EXPECT_EQ(800, rpc->msgin.granted); } -TEST_F(homa_grant, homa_grant_init_rpc__no_bpages_available) +TEST_F(homa_grant, homa_grant_init_rpc__unsched_past_end_of_message) { struct homa_rpc *rpc; @@ -231,11 +220,9 @@ TEST_F(homa_grant, homa_grant_init_rpc__no_bpages_available) self->server_ip, self->server_port, 100, 1000, 20000); - atomic_set(&self->hsk.buffer_pool->free_bpages, 0); - homa_message_in_init(rpc, 20000, 10000); - EXPECT_EQ(0, rpc->msgin.num_bpages); + homa_message_in_init(rpc, 2000, 3000); EXPECT_EQ(-1, rpc->msgin.rank); - EXPECT_EQ(10000, rpc->msgin.granted); + EXPECT_EQ(2000, rpc->msgin.granted); } TEST_F(homa_grant, homa_grant_end_rpc__basics) @@ -1079,15 +1066,16 @@ TEST_F(homa_grant, homa_grant_check_rpc__rpc_dead) EXPECT_EQ(0, rpc->msgin.granted); rpc->state = RPC_INCOMING; } -TEST_F(homa_grant, homa_grant_check_rpc__update_incoming_even_if_rpc_no_longer_active) +TEST_F(homa_grant, homa_grant_check_rpc__update_incoming_even_if_rpc_not_active) { - struct homa_rpc *rpc = unit_client_rpc(&self->hsk, UNIT_OUTGOING, - self->client_ip, self->server_ip, self->server_port, - 100, 1000, 2000); + struct homa_rpc *rpc; + + self->homa.grant->max_overcommit = 1; + test_rpc_init(self, 100, self->server_ip, 20000); + rpc = test_rpc_init(self, 102, self->server_ip, 30000); + + EXPECT_EQ(-1, rpc->msgin.rank); - homa_message_in_init(rpc, 2000, 0); - EXPECT_EQ(0, rpc->msgin.rank); - rpc->msgin.rank = -1; rpc->msgin.rec_incoming = 100; atomic_set(&self->homa.grant->total_incoming, 1000); unit_log_clear(); @@ -1160,13 +1148,13 @@ TEST_F(homa_grant, homa_grant_check_rpc__fast_path) } TEST_F(homa_grant, homa_grant_check_rpc__skip_fast_path_rpc_not_active) { - struct homa_rpc *rpc = unit_client_rpc(&self->hsk, UNIT_OUTGOING, - self->client_ip, self->server_ip, self->server_port, - 100, 1000, 20000); + struct homa_rpc *rpc; - homa_message_in_init(rpc, 20000, 0); - EXPECT_EQ(0, rpc->msgin.rank); - rpc->msgin.rank = -1; + self->homa.grant->max_overcommit = 1; + test_rpc_init(self, 100, self->server_ip, 20000); + rpc = test_rpc_init(self, 102, self->server_ip, 30000); + + EXPECT_EQ(-1, rpc->msgin.rank); unit_log_clear(); homa_rpc_lock(rpc); From 89a553519dd15ecb273cc55ed319c2f6c42a4df7 Mon Sep 17 00:00:00 2001 From: John Ousterhout Date: Tue, 27 Jan 2026 16:26:01 -0800 Subject: [PATCH 3/3] Add sync analyzer to tthoma.py --- util/tthoma.py | 548 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 547 insertions(+), 1 deletion(-) diff --git a/util/tthoma.py b/util/tthoma.py index 2995be2b..2fca9415 100755 --- a/util/tthoma.py +++ b/util/tthoma.py @@ -26,6 +26,7 @@ from statistics import stdev import string import sys +import tempfile import textwrap import time @@ -144,6 +145,7 @@ def __missing__(self, id): # first_time: Time of the first event read for this trace # last_time: Time of the last event read for this trace # elapsed_time: Total time interval covered by the trace +# records: Number of trace records in the file traces = {} # IP address -> node name. Computed by AnalyzeRpcs and AnalyzePackets. @@ -1655,6 +1657,7 @@ def parse(self, file): trace['file'] = file node = Path(file).stem trace['node'] = node + trace['records'] = 0 traces[node] = trace Dispatcher.cur_trace = trace @@ -1683,6 +1686,7 @@ def parse(self, file): trace['first_time'] = t first = False trace['last_time'] = t + trace['records'] += 1 prefix = msg[0:self.prefix_length] if prefix in self.parse_table: for pattern in self.parse_table[prefix]: @@ -1829,6 +1833,19 @@ def __gro_grant(self, trace, time, core, match, interests): 'offset ([0-9]+), priority ([0-9]+)' }) + def __gro_ctl(self, trace, time, core, match, interests): + peer = match.group(1) + id = int(match.group(2)) + type = match.group(3) + for interest in interests: + interest.tt_gro_ctl(trace, time, core, peer, id, type) + + patterns.append({ + 'name': 'gro_ctl', + 'regexp': 'homa_gro_receive got packet from (0x[0-9a-f]+) id ([0-9]+), ' + 'type (0x[0-9a-f]+)' + }) + def __softirq_data(self, trace, time, core, match, interests): id = int(match.group(1)) offset = int(match.group(2)) @@ -2626,6 +2643,52 @@ def __txq_restart(self, trace, time, core, match, interests): 'regexp': r'netdev_tx_completed_queue restarted queue (0x[a-f0-9]+)' }) + def __freeze_tx(self, trace, time, core, match, interests): + daddr = match.group(1) + for interest in interests: + interest.tt_freeze_tx(trace, time, core, daddr) + + patterns.append({ + 'name': 'freeze_tx', + 'regexp': r'Sending freeze to (0x[a-f0-9]+)' + }) + + def __freeze_rx(self, trace, time, core, match, interests): + saddr = match.group(1) + sport = int(match.group(2)) + for interest in interests: + interest.tt_freeze_rx(trace, time, core, saddr, sport) + + patterns.append({ + 'name': 'freeze_rx', + 'regexp': r'Freezing because of request on port .* from ' + r'(0x[a-f0-9]+):([0-9]+),' + }) + + def __resend_tx(self, trace, time, core, match, interests): + id = int(match.group(1)) + peer = match.group(2) + offset = int(match.group(3)) + length = int(match.group(4)) + for interest in interests: + interest.tt_resend_tx(trace, time, core, peer, id, offset, length) + + patterns.append({ + 'name': 'resend_tx', + 'regexp': r'Sending RESEND for id ([0-9]+), peer (0x[a-f0-9]+), ' + r'offset ([0-9]+), length ([0-9]+),' + }) + + def __busy_tx(self, trace, time, core, match, interests): + id = int(match.group(1)) + for interest in interests: + interest.tt_busy_tx(trace, time, core, id) + + patterns.append({ + 'name': 'busy_tx', + 'regexp': r'sending BUSY from resend, id ([0-9]+),' + }) + #------------------------------------------------ # Analyzer: activity #------------------------------------------------ @@ -10958,6 +11021,481 @@ def output(self): start, end, node = smi print('%9.3f %9.3f %6.1f %s' % (start, end, end - start, node)) +#------------------------------------------------ +# Analyzer: sync +#------------------------------------------------ +class AnalyzeSync: + """ + Rewrite the timetrace files so that their clocks are now synchronized. + To do this, analyze the timetstamps for packet sends and receives and + use this information to compute clock offsets between the nodes. If + --no-update is specified then clock offset information is printed but + trace files are not modified. + """ + + def __init__(self, dispatcher): + # rpc_id:offset[g] -> for each Homa data packet or grant + # transmission. rpc_id is the id on the sender and node is the node + # that sent the packet. A 'g' suffix after the offset indicates that + # the packet was a grant. If a packet is sent multiple times, time is + # the earliest send time (avoids confusion caused by retransmissions) + self.tx_pkts = {} + + # rpc_id:offset[g] -> for each Homa data packet or grant + # that was received. rpc_id is the id on the sender and node is the + # node that received the packet. A 'g' suffix after the offset indicates + # that the packet was a grant. If a packet is received multiple times, + # time is the latest receive time. + self.rx_pkts = {} + + # node -> rpc_id -> . For each node number, contains + # a dictionary mapping from RPC identifiers to a list of unadjusted + # times when a busy or resend packet was transmitted for rpc_id. + # Rpc_id the id on the sender. + self.ctl_tx = defaultdict(lambda: defaultdict(list)) + + # rpc_id -> times. Times is a list of unadjusted times when resend or + # busy packets were received for rpc_id (rpc_id is the id on the + # receiver). + self.ctl_rx = defaultdict(list) + + # pkt_id -> for each TCP packet sent. pkt_id is a unique + # identifier for the packet: source:dest:sequence:bytes. If the same id + # is transmitted multiple times, the first time is used. + self.tcp_tx = {} + + # pkt_id -> for each TCP packet received. pkt_id has the + # same structure as for tcp_tx. If the same id is received multiple + # times, the last time is used. + self.tcp_rx = {} + + # List of with one entry for each FREEZE packet + # sent. Time is the unadjusted time on the sender when the packet was + # sent. snode is the sending node, raddr is the receiver's IP address. + self.freeze_tx = [] + + # node -> . time is the unadjusted time on node when + # when the last freeze packet was received by it. saddr is the sender + # IP address. + self.freeze_rx = {} + + # Node -> dictionary with packet counts for that node: + # homa_tx: Homa data packets sent + # homa_rx: Homa data packets received + # grant_tx: Homa grant packets sent + # grant_rx: Homa grant packets received + # ctl_tx: Homa busy/resend packets sent + # ctl_rx: Homa busy/resend packets received + # freeze_tx: Homa freeze packets sent + # freeze_rx: Homa freeze packets received + # tcp_tx: TCP packets sent + # tcp_rx: TCP packets received + self.node_pkts = defaultdict(lambda: defaultdict(lambda: 0)) + + # rpc_id -> IP address of the node that processed the RPC with that id. + # Needed to compute addr_node. + self.id_addr = {} + + # rpc_id -> node that processed the RPC with that id. Used to compute + # addr_node. + self.id_node = {} + + # IP address -> node name; computed from id_addr and id_node. + self.addr_node = {} + + # Node name -> node id (position in get_sorted_nodes()). + self.node_id = {} + + def tt_ip_xmit(self, trace, t, core, id, offset): + node = trace['node'] + key = '%d:%d' % (id, offset) + if not key in self.tx_pkts: + self.tx_pkts[key] = [t, node] + self.node_pkts[node]['homa_tx'] += 1 + self.id_node[id] = node + + def tt_gro_data(self, trace, t, core, peer, id, offset, prio): + node = trace['node'] + key = '%d:%d' % (id ^ 1, offset) + self.rx_pkts[key] = [t, node] + self.node_pkts[node]['homa_rx'] += 1 + self.id_node[id] = node + self.id_addr[id ^ 1] = peer + + def tt_send_grant(self, trace, t, core, id, offset, priority, increment): + node = trace['node'] + key = '%d:%dg' % (id, offset) + if not key in self.tx_pkts: + self.tx_pkts[key] = [t, node] + self.node_pkts[node]['grant_tx'] += 1 + + def tt_gro_grant(self, trace, t, core, peer, id, offset, prio): + node = trace['node'] + key = '%d:%dg' % (id ^ 1, offset) + self.rx_pkts[key] = [t, node] + self.node_pkts[node]['grant_rx'] += 1 + self.id_addr[id ^ 1] = peer + + def tt_gro_ctl(self, trace, t, core, peer, id, type): + node = trace['node'] + if type == '0x16': + self.freeze_rx[node] = [t, peer] + self.node_pkts[node]['freeze_rx'] += 1 + elif type == '0x12' or type == '0x14': + self.ctl_rx[id].append(t) + self.node_pkts[node]['ctl_rx'] += 1 + self.id_node[id] = node + self.id_addr[id ^ 1] = peer + + def tt_busy_tx(self, trace, t, core, id): + node = trace['node'] + self.ctl_tx[node][id].append(t) + self.node_pkts[node]['ctl_tx'] += 1 + self.id_node[id] = node + + def tt_resend_tx(self, trace, t, core, peer, id, offset, length): + node = trace['node'] + self.ctl_tx[node][id].append(t) + self.node_pkts[node]['ctl_tx'] += 1 + self.id_node[id] = node + self.id_addr[id ^ 1] = peer + + def tt_tcp_xmit(self, trace, t, core, source, dest, data_bytes, seq_ack): + node = trace['node'] + id = '%s:%s:%s:%s' % (source, dest, seq_ack, data_bytes) + if not id in self.tcp_tx: + self.tcp_tx[id] = [t, node] + self.node_pkts[node]['tcp_tx'] += 1 + + def tt_tcp_gro(self, trace, t, core, source, dest, data_bytes, seq_ack): + node = trace['node'] + id = '%s:%s:%s:%s' % (source, dest, seq_ack, data_bytes) + self.tcp_rx[id] = [t, node] + self.node_pkts[node]['tcp_rx'] += 1 + + def tt_freeze_tx(self, trace, t, core, daddr): + node = trace['node'] + self.freeze_tx.append([t, node, daddr]) + self.node_pkts[node]['freeze_tx'] += 1 + + def analyze(self): + """ + Do some processing of the data collected so far. + """ + + # Compute the mapping from address to node. + for id, addr in self.id_addr.items(): + if id in self.id_node: + self.addr_node[addr] = self.id_node[id] + + # Compute the mapping from node name to id. + nodes = get_sorted_nodes() + for i in range(len(nodes)): + self.node_id[nodes[i]] = i + + def find_min_delays(self): + """ + Return a list containing two elements: + + min_delays: a two-level list. min_delays[src][dst] gives the + smallest observed delay for a packet to get from src to dst, as + measured with the node's unadjusted clocks (the delay in one direction + between two nodes may be negative due to clock misalignment). src + and dst are node indexes within the result of get_sorted_nodes(). + An element may be None if there were no packets between the two nodes. + + min_times: a two-level list like min_delays; each entry is a list + containing the transmit and receive times (unadjusted clocks) for + the packet that produced the entry in min_delays. This is computed + for debugging purposes and may not actually be used. + """ + + num_nodes = len(get_sorted_nodes()) + min_delays = [[None for _ in range(num_nodes)] + for _ in range(num_nodes)] + min_times = [[None for _ in range(num_nodes)] + for _ in range(num_nodes)] + + for key, tx_info in self.tx_pkts.items(): + if not key in self.rx_pkts: + continue + tx_time, tx_node = tx_info + tx_id = self.node_id[tx_node] + rx_time, rx_node = self.rx_pkts[key] + rx_id = self.node_id[rx_node] + delay = rx_time - tx_time + cur = min_delays[tx_id][rx_id] + if cur == None or delay < cur: + min_delays[tx_id][rx_id] = delay + min_times[tx_id][rx_id] = [tx_time, rx_time] + + for key, tx_info in self.tcp_tx.items(): + if not key in self.tcp_rx: + continue + tx_time, tx_node = tx_info + tx_id = self.node_id[tx_node] + rx_time, rx_node = self.tcp_rx[key] + rx_id = self.node_id[rx_node] + delay = rx_time - tx_time + cur = min_delays[tx_id][rx_id] + if cur == None or delay < cur: + min_delays[tx_id][rx_id] = delay + min_times[tx_id][rx_id] = [tx_time, rx_time] + + return min_delays, min_times + + def find_min_delays_alt(self, min_delays, min_times): + """ + Update the information in min_delays and min_times using resend + and busy packets. This is useful in situations where the cluster has + stalled so there aren't any data/grant packets. + min_delays: As returned by find_min_delays. + min_times: As returned by find_min_delays. + + Note: in January 2026 this method was transcribed from the old + ttsync.py program. However, there was no trace data available at the + time to test it, so it is probably buggy. + """ + + # Resend and busy packets are problematic because they are not unique: + # there can be several identical packets between the same pair of nodes. + # Here's how this function matches up sends and receives: + # * Start from freeze packets, which are unique; use them to compute + # an upper bound on delays in one direction. + # * Then scan packets flowing in the other direction: match sends and + # receives to pick the pair that produces the smallest positive RTT + # (when combined with freeze info in the other direction). + # * Then use this minimum in the other direction to match sends and + # recieves in the same direction as the freeze, to get a tighter bound + # that the freeze could produce by itself. + + for send_time, fsend_node, raddr in self.freeze_tx: + # Compute freeze delay. + if not raddr in self.addr_node: + continue + frecv_node = self.addr_node[raddr] + if not frecv_node in self.freeze_rx: + continue + recv_time, saddr = self.freeze_rx[frecv_node] + if (not saddr in self.addr_node or + not self.addr_node[saddr] == fsend_node): + continue + freeze_delay = recv_time - send_time + ftx_nid = self.node_id[fsend_node] + frx_nid = self.node_id[frecv_node] + if (min_delays[ftx_nid][frx_nid] == None or + freeze_delay < min_delays[ftx_nid][frx_nid]): + min_delays[ftx_nid][frx_nid] = freeze_delay + min_times[ftx_nid][frx_nid] = [send_time, recv_time] + + # Scan control packets in reverse direction from freeze. + min_delay = min_delays[frx_nid][ftx_nid] + for id, send_times in self.ctl_tx[frecv_node].items(): + rx_id = id ^ 1 + if (not rx_id in self.id_node or + self.id_node[rx_id] != fsend_node): + continue + for send in send_times: + for recv in self.ctl_rx[rx_id]: + delay = recv - send + if freeze_delay + delay > 0 and (min_delay == None + or delay < min_delay): + min_delay = delay + min_delays[frx_nid][ftx_nid] = delay + min_times[frx_nid][ftx_nid] = [send, recv] + + # Scan control packets in same direction as freeze. + reverse_delay = min_delay + if reverse_delay == None: + continue + min_delay = min_delays[ftx_nid][frx_nid] + for id, send_times in self.ctl_tx[fsend_node].items(): + rx_id = id ^ 1 + if (not rx_id in self.id_node or + self.id_node[rx_id] != frecv_node): + continue + for send in send_times: + for recv in self.ctl_rx[rx_id]: + delay = recv - send + if reverse_delay + delay > 0 and (min_delay == None or + delay < min_delay): + min_delay = delay + min_delays[ftx_nid][frx_nid] = delay + min_times[ftx_nid][frx_nid] = [send, recv] + + def get_offsets(self, min_delays): + """ + Compute clock offsets for each node. The min_delays parameter is + a two-level list as described for the min_delays result from + find_min_delays. The result is a list with three elements: + + min_offsets: a list with one element for each node in get_sorted_nodes(), + containing the minimum valid clock offset for that node, or None if + no minimum offset could be computed. + + max_offsets: the maximum valid clock offset for each node, or None if + no maximum offset could be computed. If max_offset[i] < min_offset[i] + it means there were errors in the input data. + + stats: a list containing one element for each iteration of the + computation, indicating how many times a min_offset or max_offset + value was modified during that iteration. + """ + + min_offsets = [None for _ in range(len(min_delays))] + max_offsets = [None for _ in range(len(min_delays))] + min_offsets[0] = 0 + max_offsets[0] = 0 + stats = [] + + # Each iteration of the following loop compares each node against + # each other node to update min_offset and max_offset values. The + # loop terminates when no more updates can be made. + while True: + num_updates = 0 + for i in range(len(min_offsets)): + for j in range(1, len(min_offsets)): + # Given the valid range of offsets for node i, update the + # valid range for node j to ensure that a packet cannot + # arrive before it was sent. + delay_from = min_delays[i][j] + if min_offsets[i] != None and delay_from != None: + min = min_offsets[i] - delay_from + # print(' Min from %d to %d: min[%d] %.1f, delay %.1f, min %.1f, min[%d] %s' + # % (i, j, i, min_offsets[i], delay_from, min, + # j, min_offsets[j])) + if min_offsets[j] == None or min_offsets[j] < min: + # print('Updated min[%d] to %.1f' % (j, min)) + num_updates += 1 + min_offsets[j] = min + + delay_to = min_delays[j][i] + if max_offsets[i] != None and delay_to != None: + max = max_offsets[i] + delay_to + # print(' Max from %d to %d: max[%d] %.1f, delay %.1f, max %.1f, max[%d] %s' + # % (i, j, i, max_offsets[i], delay_to, max, + # j, max_offsets[j])) + if max_offsets[j] == None or max_offsets[j] > max: + # print('Updated max[%d] to %.1f' % (j, max)) + num_updates += 1 + max_offsets[j] = max + stats.append(num_updates) + if num_updates == 0: + break + return min_offsets, max_offsets, stats + + def output(self): + global traces, options + + print('\n--------------') + print('Analyzer: sync') + print('--------------') + + print() + print('Trace file statistics:') + print('File: Name of trace file') + print('Span: Elapsed time between first and last timetrace records (ms)') + print('HomaTx: Total number of Homa data packets sent') + print('HomaRx: Total number of Homa data packets received (there ' + 'will be more') + print(' packets received than sent due to TSO on senders)') + print('GrTx: Total number of Homa grant packets sent') + print('GrRx: Total number of Homa grant packets received') + print('CtlTx: Total number of Homa busy/resend packets sent') + print('CtlRx: Total number of Homa busy/resend packets received') + print('FrTx: Total number of Homa freeze packets sent') + print('FrRx: Total number of Homa freeze packets received') + print('TcpTx: Total number of TCP packets sent') + print('TcpRx: Total number of TCP packets received') + print('\nFile Span HomaTx HomaRx GrTx GrRx ' + 'CtlTx CtlRx FrTx FrRx TcpTx TcpRx') + + nodes = get_sorted_nodes() + for node in nodes: + trace = traces[node] + print('%-12s %5.1f %6d %6d %5d %5d %5d %5d %4d %4d %6d %6d' % ( + trace['file'], + (trace['last_time'] - trace['first_time'])/1000, + self.node_pkts[node]['homa_tx'], + self.node_pkts[node]['homa_rx'], + self.node_pkts[node]['grant_tx'], + self.node_pkts[node]['grant_rx'], + self.node_pkts[node]['ctl_tx'], + self.node_pkts[node]['ctl_rx'], + self.node_pkts[node]['freeze_tx'], + self.node_pkts[node]['freeze_rx'], + self.node_pkts[node]['tcp_tx'], + self.node_pkts[node]['tcp_rx'])) + + min_delays, min_times = self.find_min_delays() + self.find_min_delays_alt(min_delays, min_times) + min_offsets, max_offsets, stats = self.get_offsets(min_delays) + + print('\nMin/max updates made in each round of the offset ' + 'calculation:') + print('Round Updates') + for i in range(len(stats)): + print('%3d %6d' % (i, stats[i])) + + print('\nClock offsets for each node:') + print('Node: Name of node') + print('Min: Minimum allowable Offset for Node (usecs)') + print('Max: Maximum allowable Offset for Node (usecs)') + print('Range: Max - Min (negative means error in data)') + print('Offset: Add this to clock values for Node') + print('\nNode Min Max Range Offset') + bad_nodes = [] + offsets = [] + for i in range(len(nodes)): + min_str = 'N/A' + max_str = 'N/A' + rng_str = 'N/A' + offset_str = 'None' + offset = None + if min_offsets[i] != None: + min_str = '%.1f' % min_offsets[i] + if max_offsets[i] != None: + max_str = '%.1f' % max_offsets[i] + if min_offsets[i] != None and max_offsets[i] != None: + rng = max_offsets[i] - min_offsets[i] + rng_str = '%.1f' % rng + if rng >= 0: + offset = (min_offsets[i] + max_offsets[i]) / 2 + offset_str = '%.1f' % offset + print('%-10s %8s %8s %8s %8s' % (nodes[i], min_str, max_str, + rng_str, offset_str)) + offsets.append(offset) + if offset == None: + bad_nodes.append(nodes[i]) + + if bad_nodes: + raise Exception('Couldn\'t compute clock offsets for the ' + 'following nodes: %s' % (', '.join(bad_nodes))) + + # Rewrite traces with synchronized times + if options.update: + print("") + nodes = get_sorted_nodes() + for i in range(1, len(nodes)): + node = nodes[i] + offset = offsets[i] + file = traces[node]['file'] + print("Rewriting %s with offset %.1f usec" % (file, offset), + file=sys.stderr) + src = open(file) + dst = tempfile.NamedTemporaryFile(dir=os.path.dirname(file), + mode='w', encoding='utf-8', delete=False) + for line in src: + match = re.match(' *([-0-9.]+) us (\(\+ *[-0-9.]+ us\) \[C[0-9]+\].*)', + line) + if not match: + print(line, file=dst) + else: + time = float(match.group(1)) + offset + dst.write('%9.3f us %s\n' % (time, match.group(2))) + dst.close() + os.rename(dst.name, file) + #------------------------------------------------ # Analyzer: tcp_rpcs #------------------------------------------------ @@ -11146,7 +11684,7 @@ def output(self): global tcp_rpcs, options print('\n------------------') - print('Analyzer: rpcs') + print('Analyzer: tcp_rpcs') print('------------------') if (options.msglen != None or options.rpc_start != None or @@ -12782,6 +13320,10 @@ def output(self): parser.add_option('--negative-ok', action='store_true', default=False, dest='negative_ok', help='Don\'t print warnings when negative delays are encountered') +parser.add_option('--no-update', action='store_false', default=True, + dest='update', + help='Used by some analyzers to control whether updates are made; ' + 'if specified, updates will not be made') parser.add_option('--node', dest='node', default=None, metavar='N', help='Specifies a particular node (the name of its ' 'trace file without the extension); required by some analyzers') @@ -12839,6 +13381,10 @@ def output(self): parser.add_option('--tx-node', dest='tx_node', default=None, metavar='N', help='If specified, some analyzers will ignore ignore packets ' 'transmitted by nodes other than N') +parser.add_option('--no-update', action='store_false', default=True, + dest='update', + help='Used by some analyzers to control whether updates are made; ' + 'if specified, updates will not be made') parser.add_option('--verbose', '-v', action='store_true', default=False, dest='verbose', help='Print additional output with more details')