diff --git a/src/Services/Basket/Basket.API/Basket.API.csproj b/src/Services/Basket/Basket.API/Basket.API.csproj
index fc040b3b0..1017dbd05 100644
--- a/src/Services/Basket/Basket.API/Basket.API.csproj
+++ b/src/Services/Basket/Basket.API/Basket.API.csproj
@@ -39,6 +39,10 @@
+
+
+
+
Always
diff --git a/src/Services/Basket/Basket.API/Events/CatalogPriceChangedHandler.cs b/src/Services/Basket/Basket.API/Events/CatalogPriceChangedHandler.cs
new file mode 100644
index 000000000..67fc6616f
--- /dev/null
+++ b/src/Services/Basket/Basket.API/Events/CatalogPriceChangedHandler.cs
@@ -0,0 +1,25 @@
+using Microsoft.eShopOnContainers.Services.Basket.API.Model;
+using Microsoft.eShopOnContainers.Services.Common.Infrastructure;
+using Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Basket.API.Events
+{
+ public class CatalogPriceChangedHandler : IIntegrationEventHandler
+ {
+ private readonly IBasketRepository _repository;
+ public CatalogPriceChangedHandler()
+ {
+ //_repository = repository;
+ }
+
+ public void Handle(CatalogPriceChanged @event)
+ {
+
+ }
+ }
+}
+
diff --git a/src/Services/Basket/Basket.API/Startup.cs b/src/Services/Basket/Basket.API/Startup.cs
index c72d68533..2eb6a36c1 100644
--- a/src/Services/Basket/Basket.API/Startup.cs
+++ b/src/Services/Basket/Basket.API/Startup.cs
@@ -13,6 +13,9 @@ using Microsoft.Extensions.Options;
using System.Net;
using Swashbuckle.Swagger.Model;
using Microsoft.eShopOnContainers.Services.Basket.API.Auth.Server;
+using Microsoft.eShopOnContainers.Services.Common.Infrastructure;
+using Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog;
+using Basket.API.Events;
namespace Microsoft.eShopOnContainers.Services.Basket.API
{
@@ -76,6 +79,9 @@ namespace Microsoft.eShopOnContainers.Services.Basket.API
});
services.AddTransient();
+ var eventBus = new EventBus();
+ services.AddSingleton(eventBus);
+ eventBus.Subscribe(new CatalogPriceChangedHandler());
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
diff --git a/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs b/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs
index bba7564e9..3558c55f5 100644
--- a/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs
+++ b/src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs
@@ -107,7 +107,7 @@ namespace Microsoft.eShopOnContainers.Services.Catalog.API.Controllers
//hook to run integration tests until POST methods are created
if (catalogTypeId.HasValue && catalogTypeId == 1)
{
- _eventBus.Publish(new CatalogPriceChanged());
+ _eventBus.Publish(new CatalogPriceChanged(2, 10.4M));
}
return Ok(model);
diff --git a/src/Services/Common/Infrastructure/Catalog/CatalogPriceChanged.cs b/src/Services/Common/Infrastructure/Catalog/CatalogPriceChanged.cs
index ad22ae49f..6e5715925 100644
--- a/src/Services/Common/Infrastructure/Catalog/CatalogPriceChanged.cs
+++ b/src/Services/Common/Infrastructure/Catalog/CatalogPriceChanged.cs
@@ -6,15 +6,16 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog
{
public class CatalogPriceChanged : IIntegrationEvent
{
- private readonly string _eventName = "catalogpricechanged";
+ public string Message { get { return "CatalogPriceChanged here!!"; } }
- public string Name {
- get
- {
- return _eventName;
- }
- }
+ public int ItemId { get; private set; }
+
+ public decimal NewPrice { get; private set; }
- public string Message { get { return "CatalogPriceChanged!!"; } }
- }
+ public CatalogPriceChanged(int itemId, decimal newPrice)
+ {
+ ItemId = itemId;
+ NewPrice = newPrice;
+ }
+}
}
diff --git a/src/Services/Common/Infrastructure/Catalog/CatalogPriceChangedHandler.cs b/src/Services/Common/Infrastructure/Catalog/CatalogPriceChangedHandler.cs
deleted file mode 100644
index 8fc045ba0..000000000
--- a/src/Services/Common/Infrastructure/Catalog/CatalogPriceChangedHandler.cs
+++ /dev/null
@@ -1,14 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog
-{
- public class CatalogPriceChangedHandler : IIntegrationEventHandler
- {
- public void Handle(CatalogPriceChanged @event)
- {
- throw new NotImplementedException();
- }
- }
-}
diff --git a/src/Services/Common/Infrastructure/EventBus.cs b/src/Services/Common/Infrastructure/EventBus.cs
index 42ef384f1..a606a6d33 100644
--- a/src/Services/Common/Infrastructure/EventBus.cs
+++ b/src/Services/Common/Infrastructure/EventBus.cs
@@ -1,40 +1,47 @@
using Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog;
+using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
+using System.Collections;
using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
using System.Text;
namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure
{
public class EventBus : IEventBus
{
- private readonly Dictionary> _handlers;
- private readonly Dictionary> _listeners;
+ private readonly string _brokerName = "event_bus";
+ private readonly Dictionary> _handlers;
+ private readonly List _eventTypes;
+
+ private Tuple _connection;
+ private string _queueName;
+
public EventBus()
{
_handlers = new Dictionary>();
- _listeners = new Dictionary>();
+ _eventTypes = new List();
}
public void Publish(IIntegrationEvent @event)
{
+ var eventName = @event.GetType().Name;
var factory = new ConnectionFactory() { HostName = "172.20.0.1" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
- {
- channel.QueueDeclare(queue: @event.Name,
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
+ {
+ channel.ExchangeDeclare(exchange: _brokerName,
+ type: "direct");
- string message = ((CatalogPriceChanged)@event).Message;
+ string message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: "",
- routingKey: @event.Name,
+ channel.BasicPublish(exchange: _brokerName,
+ routingKey: eventName,
basicProperties: null,
body: body);
}
@@ -50,30 +57,14 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure
}
else
{
- var factory = new ConnectionFactory() { HostName = "172.18.0.1" };
- var connection = factory.CreateConnection();
- var channel = connection.CreateModel();
-
- channel.QueueDeclare(queue: eventName,
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
-
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body;
- var message = Encoding.UTF8.GetString(body);
- };
- channel.BasicConsume(queue: "hello",
- noAck: true,
- consumer: consumer);
- ;
-
- _listeners.Add(eventName, new Tuple(channel, connection));
+ var channel = GetChannel();
+ channel.QueueBind(queue: _queueName,
+ exchange: _brokerName,
+ routingKey: eventName);
+
_handlers.Add(eventName, new List());
_handlers[eventName].Add(handler);
+ _eventTypes.Add(typeof(T));
}
}
@@ -88,13 +79,69 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure
if (_handlers[eventName].Count == 0)
{
_handlers.Remove(eventName);
+ var eventType = _eventTypes.Single(e => e.Name == eventName);
+ _eventTypes.Remove(eventType);
+ _connection.Item1.QueueUnbind(queue: _queueName,
+ exchange: _brokerName,
+ routingKey: eventName);
+
+ if (_handlers.Keys.Count == 0)
+ {
+ _queueName = string.Empty;
+ _connection.Item1.Close();
+ _connection.Item2.Close();
+ }
+
+ }
+ }
+ }
- var connectionItems =_listeners[eventName];
- _listeners.Remove(eventName);
+ private IModel GetChannel()
+ {
+ if (_connection != null)
+ {
+ return _connection.Item1;
+ }
+ else
+ {
+ var factory = new ConnectionFactory() { HostName = "172.20.0.1" };
+ var connection = factory.CreateConnection();
+ var channel = connection.CreateModel();
- connectionItems.Item1.Close();
- connectionItems.Item2.Close();
+ channel.ExchangeDeclare(exchange: _brokerName,
+ type: "direct");
+ if (string.IsNullOrEmpty(_queueName))
+ {
+ _queueName = channel.QueueDeclare().QueueName;
}
+
+ var consumer = new EventingBasicConsumer(channel);
+ consumer.Received += (model, ea) =>
+ {
+ var eventName = ea.RoutingKey;
+ if (_handlers.ContainsKey(eventName))
+ {
+ var message = Encoding.UTF8.GetString(ea.Body);
+ Type eventType = _eventTypes.Single(t => t.Name == eventName);
+
+ var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
+ var handlers = _handlers[eventName];
+
+
+ var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
+
+ foreach (var handler in handlers)
+ {
+ concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
+ }
+ }
+ };
+ channel.BasicConsume(queue: _queueName,
+ noAck: true,
+ consumer: consumer);
+ _connection = new Tuple(channel, connection);
+
+ return _connection.Item1;
}
}
}
diff --git a/src/Services/Common/Infrastructure/IIntegrationEvent.cs b/src/Services/Common/Infrastructure/IIntegrationEvent.cs
index 9e92db3a4..48d991283 100644
--- a/src/Services/Common/Infrastructure/IIntegrationEvent.cs
+++ b/src/Services/Common/Infrastructure/IIntegrationEvent.cs
@@ -5,7 +5,6 @@ using System.Text;
namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure
{
public interface IIntegrationEvent
- {
- string Name { get; }
+ {
}
}
diff --git a/src/Services/Common/Infrastructure/IIntegrationEventHandler.cs b/src/Services/Common/Infrastructure/IIntegrationEventHandler.cs
index b2d902ca2..49d194ecb 100644
--- a/src/Services/Common/Infrastructure/IIntegrationEventHandler.cs
+++ b/src/Services/Common/Infrastructure/IIntegrationEventHandler.cs
@@ -10,5 +10,7 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure
void Handle(TIntegrationEvent @event);
}
- public interface IIntegrationEventHandler { }
+ public interface IIntegrationEventHandler
+ {
+ }
}
diff --git a/src/Services/Common/Infrastructure/Infrastructure.csproj b/src/Services/Common/Infrastructure/Infrastructure.csproj
index fc7038ce9..34f8bc5d5 100644
--- a/src/Services/Common/Infrastructure/Infrastructure.csproj
+++ b/src/Services/Common/Infrastructure/Infrastructure.csproj
@@ -5,6 +5,7 @@
Microsoft.eShopOnContainers.Services.Common.Infrastructure
+
\ No newline at end of file