-
Notifications
You must be signed in to change notification settings - Fork 0
/
Program.cs
200 lines (173 loc) · 6.95 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
using System.Diagnostics;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
const string exchangeName = "rmq-dotnet-client-1584-exchange";
const string queueName = "rmq-dotnet-client-1584";
const string hostName = "haproxy.local";
const ushort port = 5672;
AutoResetEvent latch = new AutoResetEvent(false);
bool running = true;
void CancelHandler(object? sender, ConsoleCancelEventArgs e)
{
Console.WriteLine("[INFO] CTRL-C pressed, exiting!");
e.Cancel = true;
running = false;
latch.Set();
}
Console.CancelKeyPress += new ConsoleCancelEventHandler(CancelHandler);
Console.WriteLine("[INFO] PRODUCER: waiting 10 seconds to try initial connection to RabbitMQ");
if (latch.WaitOne(TimeSpan.FromSeconds(10)))
{
Console.WriteLine("[INFO] PRODUCER EXITING");
Environment.Exit(0);
}
var factory = new ConnectionFactory()
{
ClientProvidedName = "PRODUCER",
HostName = hostName,
Port = port,
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true
};
bool connected = false;
IConnection? connection = null;
void Connect()
{
while (!connected)
{
try
{
connection = factory.CreateConnection();
connected = true;
}
catch (BrokerUnreachableException)
{
connected = false;
Console.WriteLine("[INFO] PRODUCER: waiting 5 seconds to re-try connection!");
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
}
void Publish()
{
var latchSpan = TimeSpan.FromSeconds(1);
bool maybeExit = false;
var autorecoveringConnection = connection as IAutorecoveringConnection;
try
{
byte[] buffer = new byte[1024];
Random rnd = new Random();
using (connection)
{
if (connection == null)
{
Console.Error.WriteLine("[ERROR] PRODUCER: unexpected null connection");
}
else
{
if (autorecoveringConnection is not null)
{
autorecoveringConnection.RecoverySucceeded += (s, ea) =>
{
Console.WriteLine("[INFO] PRODUCER: connection recovery succeeded!");
maybeExit = false;
};
}
connection.CallbackException += (s, ea) =>
{
var cea = (CallbackExceptionEventArgs)ea;
Console.Error.WriteLine($"[ERROR] PRODUCER: connection.CallbackException: {cea}");
};
connection.ConnectionBlocked += (s, ea) =>
{
var cbea = (ConnectionBlockedEventArgs)ea;
Console.Error.WriteLine($"[WARNING] PRODUCER: connection.ConnectionBlocked: {cbea}");
};
connection.ConnectionUnblocked += (s, ea) =>
{
Console.WriteLine($"[INFO] PRODUCER: connection.ConnectionUnblocked: {ea}");
};
connection.ConnectionShutdown += (s, ea) =>
{
var sdea = (ShutdownEventArgs)ea;
Console.Error.WriteLine($"[WARNING] PRODUCER: connection.ConnectionShutdown: {sdea}");
maybeExit = true;
};
using (var channel = connection.CreateModel())
{
channel.CallbackException += (s, ea) =>
{
var cea = (CallbackExceptionEventArgs)ea;
Console.Error.WriteLine($"[ERROR] PRODUCER: channel.CallbackException: {cea}");
};
channel.ModelShutdown += (s, ea) =>
{
var sdea = (ShutdownEventArgs)ea;
Console.Error.WriteLine($"[WARNING] PRODUCER: channel.ModelShutdown: {sdea}");
maybeExit = true;
};
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic, durable: true);
var args = new Dictionary<string, object>
{
{ "x-queue-type", "quorum" }
};
var queueDeclareResult = channel.QueueDeclare(queue: queueName, durable: true,
exclusive: false, autoDelete: false, arguments: args);
Debug.Assert(queueName == queueDeclareResult.QueueName);
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "update.*");
channel.ConfirmSelect();
int maybeExitTries = 0;
while (false == latch.WaitOne(latchSpan))
{
if (false == maybeExit && true == connection.IsOpen)
{
maybeExitTries = 0;
rnd.NextBytes(buffer);
string now = DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.ffffff");
string routingKey = string.Format("update.{0}", Guid.NewGuid().ToString());
try
{
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey,
basicProperties: null, body: buffer, mandatory: true);
channel.WaitForConfirmsOrDie();
Console.WriteLine($"[INFO] PRODUCER sent message at {now}");
}
catch (AlreadyClosedException ex)
{
Console.Error.WriteLine($"[WARNING] PRODUCER caught already closed exception: {ex}");
}
}
else
{
Console.WriteLine($"[INFO] PRODUCER waiting for connection to open: {maybeExitTries}");
if (maybeExit)
{
maybeExitTries++;
}
// Wait for 60 seconds for connection recovery
if (maybeExitTries > 60)
{
Console.Error.WriteLine($"[ERROR] PRODUCER could not auto-recover connection within 60 seconds, re-connecting!");
break;
}
}
}
}
}
Console.WriteLine("[INFO] PRODUCER: about to Dispose() connection...");
}
Console.WriteLine("[INFO] PRODUCER: Dispose() connection complete.");
}
catch (Exception ex)
{
Console.Error.WriteLine($"[ERROR] PRODUCER exception: {ex}");
}
}
do
{
Connect();
Publish();
connected = false;
connection = null;
} while (running);