diff --git a/datagram_queue.go b/datagram_queue.go index b443a55e415..0c3418d10dd 100644 --- a/datagram_queue.go +++ b/datagram_queue.go @@ -90,7 +90,7 @@ func (h *datagramQueue) Peek() *wire.DatagramFrame { func (h *datagramQueue) dequeueNextFrame() *wire.DatagramFrame { h.nextFrame.peekCount++ - if h.nextFrame.peekCount > maxPeekAttempt { + if h.nextFrame.peekCount >= maxPeekAttempt { h.Pop(&DatagramQueuedTooLong{}) return nil } diff --git a/datagram_queue_test.go b/datagram_queue_test.go index 80ecf5bc549..8f7d3aa27a2 100644 --- a/datagram_queue_test.go +++ b/datagram_queue_test.go @@ -76,7 +76,7 @@ var _ = Describe("Datagram Queue", func() { errChan <- queue.AddAndWait(f) }() Eventually(queued).Should(HaveLen(1)) - for i := 0; i < int(maxPeekAttempt); i++ { + for i := 0; i < int(maxPeekAttempt-1); i++ { Expect(queue.Peek()).To(Equal(f)) } Expect(queue.Peek()).To(BeNil()) diff --git a/packet_packer_test.go b/packet_packer_test.go index e68906f39f1..e273d402ceb 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -30,6 +30,7 @@ var _ = Describe("Packet packer", func() { var ( packer *packetPacker retransmissionQueue *retransmissionQueue + datagramQueued chan struct{} datagramQueue *datagramQueue framer *MockFrameSource ackFramer *MockAckFrameSource @@ -95,7 +96,10 @@ var _ = Describe("Packet packer", func() { ackFramer = NewMockAckFrameSource(mockCtrl) sealingManager = NewMockSealingManager(mockCtrl) pnManager = mockackhandler.NewMockSentPacketHandler(mockCtrl) - datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger) + datagramQueued = make(chan struct{}, 100) + datagramQueue = newDatagramQueue(func() { + datagramQueued <- struct{}{} + }, utils.DefaultLogger) packer = newPacketPacker(protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}), func() protocol.ConnectionID { return connID }, initialStream, handshakeStream, pnManager, retransmissionQueue, sealingManager, framer, ackFramer, datagramQueue, protocol.PerspectiveServer) }) @@ -589,6 +593,7 @@ var _ = Describe("Packet packer", func() { Expect(p.Frames[0].Frame).To(Equal(f)) Expect(buffer.Data).ToNot(BeEmpty()) Eventually(done).Should(BeClosed()) + }) It("doesn't pack a DATAGRAM frame if the ACK frame is too large", func() { @@ -621,6 +626,59 @@ var _ = Describe("Packet packer", func() { Eventually(done).Should(BeClosed()) }) + It("large datagram doesn't block send queue indefinitely", func() { + for i := 0; i < int(maxPeekAttempt+1); i++ { + ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, true).Return(&wire.AckFrame{AckRanges: []wire.AckRange{{Largest: 100}}}) + pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2) + pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42)) + sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil) + framer.EXPECT().HasData() + } + largeFrame := &wire.DatagramFrame{ + DataLenPresent: true, + Data: make([]byte, maxPacketSize), + } + smallFrame := &wire.DatagramFrame{ + DataLenPresent: true, + Data: make([]byte, 10), + } + largeSender := make(chan error) + go func() { + defer GinkgoRecover() + largeSender <- datagramQueue.AddAndWait(largeFrame) + }() + + smallerSender := make(chan error) + go func() { + defer GinkgoRecover() + Eventually(datagramQueued).Should(HaveLen(1)) + smallerSender <- datagramQueue.AddAndWait(smallFrame) + }() + // make sure the DATAGRAM has actually been queued + time.Sleep(scaleDuration(20 * time.Millisecond)) + + buffer := getPacketBuffer() + for i := 0; i < int(maxPeekAttempt); i++ { + p, err := packer.AppendPacket(buffer, maxPacketSize, protocol.Version1) + Expect(p).ToNot(BeNil()) + Expect(err).ToNot(HaveOccurred()) + Expect(p.Ack).ToNot(BeNil()) + Expect(p.Frames).To(BeEmpty()) + Expect(buffer.Data).ToNot(BeEmpty()) + } + Eventually(largeSender).Should(Receive(Equal(&DatagramQueuedTooLong{}))) + + Eventually(datagramQueued).Should(HaveLen(2)) + p, err := packer.AppendPacket(buffer, maxPacketSize, protocol.Version1) + Expect(p).ToNot(BeNil()) + Expect(err).ToNot(HaveOccurred()) + Expect(p.Ack).ToNot(BeNil()) + Expect(p.Frames[0].Frame).To(Equal(smallFrame)) + Expect(buffer.Data).ToNot(BeEmpty()) + Eventually(smallerSender).Should(Receive(BeNil())) + datagramQueue.CloseWithError(nil) + }) + It("accounts for the space consumed by control frames", func() { pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2) sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil)