31 Mayıs 2023 Çarşamba

Docker Compose Desteği

Giriş
Açıklaması şöyle
When your app is starting up, the Docker Compose integration will look for a configuration file in the current working directory. The following files are supported:

- compose.yaml

- compose.yml

- docker-compose.yaml

- docker-compose.yml

To use a non-standard file, set the spring.docker.compose.file property.
Maven
Şöyle yaparız
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-docker-compose</artifactId>
    <optional>true</optional>
</dependency>
Gradle
Şöyle yaparız
dependencies {
  developmentOnly("org.springframework.boot:spring-boot-docker-compose")
}
Örnek - docker-compose.yaml
Şöyle yaparız. Böylece uygulamayı çalıştırırken veri tabanı da çalışır
version: '3.1'


services:
  db:
    image: postgres
    restart: always
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USERNAME: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: merchant_db
spring.docker.compose.lifecycle-management Alanı
Açıklaması şöyle
none: Do not start or stop the docker-compose file
start-only: Start the docker-compose file when the application starts and leave it
start-and-stop: Start Docker Compose when the application starts and stop it when the JVM exits (This is the default behavior)
Örnek
Şöyle yaparız
spring.docker.compose.lifecycle-management = START_AND_STOP
spring.docker.compose.file Alanı
application.propertes ile belirtilen docker compose dosyasını da çalıştırır.  Eğer istenirse docker-compose.yml dosyasında profile da tanımlanabilir. Böylece spring profile ile eşleşiyorsa bir container çalıştırılır

Örnek 
Şöyle yaparız.  
spring:
  docker:
   compose:
     file: "./docker/compose.yml"
spring.docker.compose.profiles.active Alanı
Örnek 
Şöyle yaparız.  
docker:
    compose:
      file: "./docker/docker-compose.yml"
      profiles.active: "indexer"













SpringData Multitenancy - Hibernate İle Separate Database

Giriş
Bu yöntem JPA sağlayıcısı olarak Hibernate kullanıyorsak işe yarar. 

Kısaca
Hibernate tarafından sağlanan AbstractMultiTenantConnectionProvider sınıfından kalıtan yeni bir ConnectionProvider yazılır. Bu sınıf belirtilen schema'ya geçiş yapar. Bu sınıfı kodlarken şu metodlar override edilir
protected ConnectionProvider getAnyConnectionProvider();
public Connection getConnection(String tenantIdentifier);
protected ConnectionProvider selectConnectionProvider(String tenantIdentifier);
AbstractMultiTenantConnectionProvider Sınıfı

Örnek
Şöyle yaparız
apublic class SchemaMultiTenantConnectionProvider 
  extends AbstractMultiTenantConnectionProvider {

  private static final String HIBERNATE_PROPERTIES_PATH = "/application.properties";
  private final Map<String, ConnectionProvider> connectionProviderMap;

  public SchemaMultiTenantConnectionProvider() {
    this.connectionProviderMap = new HashMap<String, ConnectionProvider>();
  }
        
  @Override
  public Connection getConnection(String tenantIdentifier) throws SQLException {
    Connection connection = super.getConnection(tenantIdentifier);
    connection.createStatement().execute(
      String.format("SET SCHEMA '%s';", tenantIdentifier));
    return connection;
  }
        
  @Override
  protected ConnectionProvider getAnyConnectionProvider() {
    return getConnectionProvider(TenantContext.DEFAULT_TENANT_ID);
  }
 
  @Override
  protected ConnectionProvider selectConnectionProvider(String tenantIdentifier) {
    return getConnectionProvider(tenantIdentifier);
  }
        
  private ConnectionProvider getConnectionProvider(String tenantIdentifier) {
    return Optional.ofNullable(tenantIdentifier)
      .map(connectionProviderMap::get)
      .orElseGet(() -> createNewConnectionProvider(tenantIdentifier));
  }
 
   private ConnectionProvider createNewConnectionProvider(String tenantIdentifier) {
     return Optional.ofNullable(tenantIdentifier)
       .map(this::createConnectionProvider)
       .map(connectionProvider -> {
         connectionProviderMap.put(tenantIdentifier, connectionProvider);
         return connectionProvider;
       })
       .orElseThrow(() -> 
        new ConnectionProviderException(
String.format("Cannot create new connection provider for tenant: %s", tenantIdentifier)));
      }
        
    private ConnectionProvider createConnectionProvider(String tenantIdentifier) {
      return Optional.ofNullable(tenantIdentifier)
        .map(this::getHibernatePropertiesForTenantId)
        .map(this::initConnectionProvider)
        .orElse(null);
    }
        
  private Properties getHibernatePropertiesForTenantId(String tenantId) {
    try {
      Properties properties = new Properties();
      properties.load(getClass().getResourceAsStream(HIBERNATE_PROPERTIES_PATH));
      return properties;
    } catch (IOException e) {
      throw new RuntimeException(
   String.format("Cannot open hibernate properties: %s)", HIBERNATE_PROPERTIES_PATH));
    }
  }
 
  private ConnectionProvider initConnectionProvider(Properties hibernateProperties) {
    DriverManagerConnectionProviderImpl connectionProvider = 
      new DriverManagerConnectionProviderImpl();
    connectionProvider.configure(hibernateProperties);
    return connectionProvider;
  }      
}
Açıklaması şöyle
In this case, we have one method to overload: getConnection(). We call an SQL that changes the schema on the connection we’ve already created. In this implementation, we assumed that the tenantId is also the name of the schema. With tenantId mapping, the name of the schema can also be implemented from this side.

SpringData Multitenancy - Hibernate İle Shared Database, Separate Schemas

Giriş
Bu yöntem JPA sağlayıcısı olarak Hibernate kullanıyorsak işe yarar. Normalde Hibernate "separate database" veya "separate schema" yöntemini destekler. Açıklaması şöyle
In Hibernate, we will have 2 classes to implement:

org.hibernate.context.spi.CurrentTenantIdentifierResolver — which will define a tenantId for Hibernate, thanks to which it will know what resources to get to;

org.hibernate.engine.jdbc.connections.spi.AbstractMultiTenantConnectionProvider — will open a connection to resources based on the CurrentTenantIdentifierResolver tenant id returned.
Bu yazıda "separate schema" yöntemi anlatılıyor. Açıklaması şöyle
In this approach, all tenants share a single database, but each has its own schema. This pattern provides a balance between data isolation and resource optimization. With Hibernate, which Spring Boot uses by default for ORM, you can dynamically set the schema based on the tenant context.

For this approach, you can use a similar DataSource setup as the first approach. However, instead of having different data sources, you'll switch the Hibernate default schema dynamically. 
Kısaca
1. CurrentTenantIdentifierResolver arayüzünden kalıtan bir bean kodlarız. Bu bean'de  resolveCurrentTenantIdentifier() metodunu kodlarız ve Hibernate'in kullanmasını istediğimiz schema ismini döneriz.

CurrentTenantIdentifierResolver Sınıfı
Şu satırı dahil ederiz
import org.hibernate.context.spi.CurrentTenantIdentifierResolver;
resolveCurrentTenantIdentifier metodu
Örnek
Şöyle yaparız
public class TenantSchemaResolver implements CurrentTenantIdentifierResolver {
  @Override
  public String resolveCurrentTenantIdentifier() {
    return TenantContext.getTenantSchema(); // Get tenant's schema from tenant context
  }
  @Override
  public boolean validateExistingCurrentSessions() {
    return true;
  }
}
Örnek
Şöyle yaparız
@Component
@Scope(value = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class TenantIdentifierResolver implements CurrentTenantIdentifierResolver {
        
  @Override
  public String resolveCurrentTenantIdentifier() {
    return Optional.ofNullable(TenantContext.getCurrentTenant())
                   .orElse(TenantContext.DEFAULT_TENANT_ID);
  }
 
  @Override
  public boolean validateExistingCurrentSessions() {
    return true;
  }
}
Açıklaması şöyle
- The resolveCurrentTenantIdentifier() method returns the tenantId for the tenant in the context.
- If we want Hibernate to validate all existing sessions for the indicated tenantId, then we return the value true in the validateExistingCurrentSessions() method.
TenantContext şöyle. Sadece ThreadLocal nesnesini barındırıyor
public class TenantContext {
 
  public static final String DEFAULT_TENANT_ID = "public";
  private static ThreadLocal<String> currentTenant = new ThreadLocal<>();
 
  public static void setCurrentTenant(String tenant) {
    currentTenant.set(tenant);
  }
 
  public static String getCurrentTenant() {
    return currentTenant.get();
  }
 
  public static void clear() {
    currentTenant.remove();
  }      
}
Şimdi bir AsyncHandlerInterceptor lazım. Böylece her isteğin başında schemaId atanır ve istek sonunda silinir. Şöyle yaparız
@Component
public class TenantRequestInterceptor implements AsyncHandlerInterceptor {
        
  private SecurityDomain securityDomain;
        
  public TenantRequestInterceptor(SecurityDomain securityDomain) {
    this.securityDomain = securityDomain;
  }
 
  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, 
    Object handler) {
    return Optional.ofNullable(request)
      .map(req -> securityDomain.getTenantIdFromJwt(req))
      .map(tenant -> setTenantContext(tenant))
      .orElse(false);
  }
 
  @Override
  public void postHandle(HttpServletRequest request, HttpServletResponse response, 
    Object handler, ModelAndView modelAndView) {
    TenantContext.clear();
  }
         
  private boolean setTenantContext(String tenant) {
    TenantContext.setCurrentTenant(tenant);
    return true;
  }
}
Bu AsyncHandlerInterceptor nesnesini Spring'e tanıtmak için şöyle yaparız
@Configuration
public class WebConfiguration implements WebMvcConfigurer {
 
  @Autowired
  private TenantRequestInterceptor tenantInterceptor;
        
  @Override
  public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(tenantInterceptor).addPathPatterns("/**");
  }    
}
Ayarlar şöyle
spring.jpa.properties.hibernate.multiTenancy=

spring.jpa.properties.hibernate.tenant_identifier_resolver=

spring.jpa.properties.hibernate.multi_tenant_connection_provider=
Açıklaması şöyle
The first parameter is used to define what multitenancy strategy we are adopting. We have a choice of values ​​here:

NONE — default value — multitenancy is disabled. In this strategy, if we set a tenantId, Hibernate will throw an exception.
SCHEMA — for the separate schema strategy.
DATABASE — for the separate database strategy.
DISCRIMINATOR — for the shared schema strategy (not yet implemented in Hibernate. It was planned for Hibernate 5, but this issue is still open, so we need to wait.)
Açıklaması şöyle
The other two parameters point to the corresponding class implementations for CurrentTenantIdentifierResolver and AbstractMultiTenantConnectionProvider.
Ayrıca hibernate ve spring için bağlantı ayarlarını belirtmek gerekiyor. Şöyle yaparız
hibernate.connection.url=
hibernate.connection.username=
hibernate.connection.password=
spring.datasource.url=${hibernate.connection.url}
spring.datasource.username=${hibernate.connection.username}
spring.datasource.password=${hibernate.connection.password}


30 Mayıs 2023 Salı

SpringCloud Stream MessageRoutingCallback Sınıfı

Giriş
Açıklaması şöyle
CloudEvents is an open standard that provides a common format for describing event data and metadata, making it easier to share events between different systems. 
Örnek
Elimizde bir NewsEvent ve AlertEvent sınıf hiyerarşisi olsun. Producer CloudEventMessageBuilder sınıfını kullanarak şöyle gönderir
@Component
public class NewsEventProducer { private final StreamBridge streamBridge; public NewsEventProducer(StreamBridge streamBridge) { this.streamBridge = streamBridge; } ... public Message<NewsEvent> send(String key, NewsEvent newsEvent) { Message<NewsEvent> message = CloudEventMessageBuilder .withData(newsEvent) .setHeader("partitionKey", key) .build(); streamBridge.send("news-out-0", message); ... return message; } } @Component public class AlertEventProducer { private final StreamBridge streamBridge; public AlertEventProducer(StreamBridge streamBridge) { this.streamBridge = streamBridge; } ... public Message<AlertEvent> send(String key, AlertEvent alertEvent) { Message<AlertEvent> message = CloudEventMessageBuilder .withData(alertEvent) .setHeader("partitionKey", key) .build(); streamBridge.send("alert-out-0", message); return message; } }
Producer için application.properties şöyledir. alert-out-0 ve news-out-0 kanallarına yazılan mesajlar Kafka'daki news.event ve alerts.events topiclerine yazılır
spring.cloud.stream.output-bindings=news;alert
...
spring.cloud.stream.bindings.news-out-0.destination=news.events
spring.cloud.stream.bindings.alert-out-0.destination=alert.events
...
Consumer tarafında  application.properties şöyledir. Hangi topic'leri dinlemek istediğimizi belirtiriz. Kafka'daki news.event ve alerts.events topiclerini dinleriz
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=news.events,alert.events
Consumer MessageRoutingCallback ile event'leri dağıtır. Şöyle yaparız. Tek yapmamız gereken appConfigurationProperties map nesnesine sınıfın fully qualified ismine karşılık gelen kanalın ismini yazmak
@Configuration
public class MessageRoutingConfig {

  private AppConfigurationProperties appConfigurationProperties;

  public MessageRoutingConfig(AppConfigurationProperties appConfigurationProperties) {
    this.appConfigurationProperties = appConfigurationProperties;
  }

  @Bean
  public MessageRoutingCallback messageRoutingCallback() {
    return new MessageRoutingCallback() {
      @Override
      public String routingResult(Message<?> message) {
        return appConfigurationProperties.getRoutingMap()
          .getOrDefault(CloudEventMessageUtils.getType(message), "unknownEvent");
      }
    };
  }

  @Bean
  public Consumer<Message<?>> unknownEvent() {
    return message -> log.warn("...", message.getHeaders(), message.getPayload());
  }
}
Gerçek consumer kodları ise şöyle
@Component
public class NewsEventConsumer {

  @Bean
  public Consumer<Message<CNNNewsCreated>> cnnNewsCreated() {
    return message -> ...;
  }

  @Bean
  public Consumer<Message<DWNewsCreated>> dwNewsCreated() {
    return message -> ...;
  }

  @Bean
  public Consumer<Message<RAINewsCreated>> raiNewsCreated() {
    return message -> ...;
  }
}

@Component
public class AlertEventConsumer {

  @Bean
  public Consumer<Message<EarthquakeAlert>> earthquakeAlert() {
    return message -> ...;
  }

  @Bean
  public Consumer<Message<WeatherAlert>> weatherAlert() {
    return message -> ...;
  }
}


26 Mayıs 2023 Cuma

SpringBoot spring.jpa Hibernate'e Özel Ayarlar - Query Plan Cache

Giriş
Açıklaması şöyle
Every JPQL query or Criteria query is parsed into an Abstract Syntax Tree (AST) prior to execution so that Hibernate can generate the SQL statement. Since query compilation takes time, Hibernate provides a QueryPlanCache for better performance.

SpringBoot spring.jpa Hibernate'e Özel Ayarlar - Query Cache

Giriş
Açıklaması şöyle
Hibernate can also cache result set of a query. Hibernate Query Cache doesn’t cache the state of the actual entities in the cache; it caches only identifier values and results of value type. So it should always be used in conjunction with the second-level cache.
Açıklaması şöyle. Kullanılması tavsiye edilmiyor
Caching of query results introduces some overhead in terms of your application's normal transactional processing. For example, if you cache the results of a query against a Person, Hibernate will need to keep track of when those results should be invalidated because changes have been committed against any Person entity.

That, coupled with the fact that most applications simply gain no benefit from caching query results, leads Hibernate to disable caching of query results by default.
Açıklaması şöyle
For all tables that are queried as part of cacheable queries, Hibernate keeps last update timestamps in a separate region named org.hibernate.cache.spi.UpdateTimestampsCache. Being aware of this region is very important if we use query caching because Hibernate uses it to verify that cached query results aren't stale. The entries in this cache must not be evicted/expired as long as there are cached query results for the corresponding tables in the query results regions. It's best to turn off automatic eviction and expiration for this cache region, as it doesn't consume lots of memory anyway.
Query Cache Nerede Saklanıyor
Eskiden org.hibernate.cache.internal.StandardQueryCache diye bir sınıf içinde saklanıyormuş. Bir soru burada. Ancak sanırım Hibernate 5.3 ile bu sınıf kaldırıldı

Ayarlar
hibernate.cache.use_query_cache
collection_cache, query_cache gibi farklı cache'ler mevcut. Açıklaması şöyle
Set this to true to enable the query cache.
Kod
Örnek
Şöyle yaparız
org.hibernate.Session session = (Session) entityManager.getDelegate(); Statistics statistics = session.getSessionFactory().getStatistics(); // check these variables // queryCacheHitCount // queryCacheMissCount // queryCachePutCount
Örnek - Spring Repository
Şöyle yaparız findAll() metodunun döndürdüğü her şey Hibernate Query Cache içinde saklanır
import jakarta.persistence.QueryHint; import org.springframework.data.jpa.repository.QueryHints; import org.springframework.data.repository.CrudRepository; import org.springframework.stereotype.Repository; import static org.hibernate.jpa.HibernateHints.HINT_CACHEABLE; @Repository public interface BookRepository extends CrudRepository<Book, Long> { @Override @QueryHints(@QueryHint(name = HINT_CACHEABLE, value = "true")) Iterable<Book> findAll(); }
Örnek
Şöyle yaparız
@Repository public interface AuthorRepository extends CrudRepository<Author, Integer> { @QueryHints({ @QueryHint(name = "org.hibernate.cacheable", value ="true") }) Author findByName(String name); }
Örnek - Spring Repository
Şöyle yaparız. Burada Query Cache için isim belirtiliyor
import org.springframework.data.jpa.repository.QueryHints; import javax.persistence.QueryHint; import static org.hibernate.jpa.QueryHints.HINT_CACHEABLE; import static org.hibernate.jpa.QueryHints.HINT_CACHE_REGION; // some code is skipped @QueryHints({ @QueryHint(name = HINT_CACHEABLE, value = "true"), @QueryHint(name = HINT_CACHE_REGION, value = "query-cache-users") }) @Override Iterable<User> findAll();
Örnek - EntityManager
Şöyle yaparız
TypedQuery<BookPublisherValue> q = em .createQuery( "SELECT new org.thoughts.on.java.model.BookPublisherValue(b.title, p.name) + "FROM Book b JOIN b.publisher p WHERE b.id = :id", BookPublisherValue.class); q.setHint(QueryHints.CACHEABLE, true); q.setParameter("id", 1L); BookPublisherValue value = q.getSingleResult();
EhCache
Örnek
Şöyle yaparız
spring.jpa.properties.hibernate.cache.use_query_cache=true spring.jpa.properties.hibernate.cache.region.factory_class=org.hibernate.cache.ehcache.EhCacheRegionFactory





23 Mayıs 2023 Salı

SpringWebFlux ResponseBodyResultHandler Sınıfı

Giriş
Şu satırı dahil ederiz
import org.springframework.web.reactive.result.method.annotation.ResponseBodyResultHandler;
Açıklaması şöyle
- it runs after the controller has finished processing
- supports method : to make logical evaluation if the handler is to be used or not
- handleResult method : where we modify the response
handleResult metodu
Örnek
Şöyle yaparız
@Override
public Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
  ServiceResponse s = new ServiceResponse();
  s.setMethod(exchange.getRequest().getMethod().name());
  s.setStatus(exchange.getResponse().getStatusCode().value());
  s.setCorrelationId(exchange.getAttribute("correlation-id"));
  var adapter = getAdapter(result);
  modify the result as you want
  if (adapter != null) { // if the response was wrapped inside Mono?
    Mono<ServiceResponse> body = ((Mono<Object>) result.getReturnValue())
      .map(o -> {
        s.setData(o);
        return s;
    });
    return writeBody(body, result.getReturnTypeSource().nested(), exchange);
  } else { // if the response was not wrapped inside Mono
    s.setData(result.getReturnValue());
    Mono<ServiceResponse> body = Mono.just(s);
    return writeBody(body, result.getReturnTypeSource().nested(), exchange);
  }
}
supports metodu
Örnek
Şöyle yaparız
public class CustomResponseBodyResultHandler extends ResponseBodyResultHandler {

  public CustomResponseBodyResultHandler(List<HttpMessageWriter<?>> writers, 
                                         RequestedContentTypeResolver resolver) {
    super(writers, resolver);
  }

  @Override
  public boolean supports(HandlerResult result) {
    var className = result.getReturnTypeSource().getDeclaringClass().getName();
    var methodName = result.getReturnTypeSource().getMethod().getName();
    var classAnnotations = result.getReturnTypeSource().getDeclaringClass()
      .getAnnotations();
    var methodAnnotations = result.getReturnTypeSource().getMethodAnnotations();
    var annotations = result.getReturnTypeSource().getDeclaringClass().getAnnotations();

    if (Arrays.stream(classAnnotations)
      .anyMatch(a -> a.annotationType() == ApiResponse.class)) {
        log.info("{} is marked with ApiResponse annotation", className);
        return true;
    } else if (Arrays.stream(methodAnnotations)
      .anyMatch(a -> a.annotationType() == ApiResponse.class)) {
      return true;
    }
    return false;
  }
  ...
}

21 Mayıs 2023 Pazar

SpringData MongoDB @GeoSpatialIndexed Anotasyonu

Giriş
Şu satırı dahil ederiz
import org.springframework.data.mongodb.core.index.GeoSpatialIndexed;
Örnek
Şöyle yaparız
import org.springframework.data.mongodb.core.index.GeoSpatialIndexed;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
public class User {
    
  private String id;
  private String name;
  @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)
  private Location location;
  private List<String> friendIds;
  // getters and setters omitted for brevity
}
@Document
public class Place {
  private String id;
  private String name;
  @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)
  private Location location;
    
  // getters and setters omitted for brevity
}
public class Location {
  private double x;
  private double y;
    
  // getters and setters omitted for brevity
}
Şöyle yaparız
public interface UserRepository extends MongoRepository<User, String> {} public interface PlaceRepository extends MongoRepository<Place, String> {} @Service public class UserService { @Autowired private UserRepository userRepository; public List<User> findNearbyFriends(String userId, Distance distance) { User user = userRepository.findById(userId) .orElseThrow(() -> new RuntimeException("User not found")); Point userLocation = new Point(user.getLocation().getX(), user.getLocation().getY()); return userRepository.findByLocationNear(userLocation, distance); } } @Service public class PlaceService { @Autowired private PlaceRepository placeRepository; public List<Place> findNearbyPlaces(String userId, Distance distance) { User user = userRepository.findById(userId) .orElseThrow(() -> new RuntimeException("User not found")); Point userLocation = new Point(user.getLocation().getX(), user.getLocation().getY()); return placeRepository.findByLocationNear(userLocation, distance); } }





16 Mayıs 2023 Salı

WireMock @WireMockTest Anotasyonu

Giriş
Açıklaması şöyle
httpPort: To indicate on which port we will be able to make HTTP calls.
httpsEnabled and httpsPort:. To indicate that we want to make HTTPS calls and in which port.
proxyMode. To emulate a domain name other than localhost. In this case and using HTTPClient we must use the useSystemProperties method when creating the client.
Örnek
Şöyle yaparız
@Log4j2
@WireMockTest(httpsEnabled = true, httpsPort = 9090, proxyMode = true)
public class WiremockBasicTest {
  String BEARER_TOKEN = "Bearer 77d9b8f0-fafe-3778-addf-2755bdc53c88";
  String JSON_CONTENT = "{\"hellow\":\"world\"}";

  @Test
  public void doGetAndGetResponse_proxyMode() throws Exception{
    stubFor(get("/sample").withHeader(HttpHeaders.AUTHORIZATION, WireMock.equalTo(BEARER_TOKEN))
      .withHost(WireMock.equalTo("mydomain.com"))
      .willReturn(aResponse().withBody(JSON_CONTENT).withStatus(200)));
    ...
  }
}

12 Mayıs 2023 Cuma

SpringKafka Consumer Topicleri Sürekli En Baştan Okumak

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
Topicleri sürekli en baştan okumak için 2 tane yöntem var

1. enable-auto-commit=false Yapmak
Kafka'ya bağlandıktan sonra @KafkaListener anotasyonlu sınıflar çağrılmaya başlanır.
Eğer listener'lar topic'lerin her zaman en başından başlasın istersek şöyle yaparız
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
2. ConsumerSeekAware  Kullanmak
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
onPartitionsAssigned metodu
Açıklaması şöyle
This method is called when Kafka topic partitions are assigned to the consumer group. It provides a map of assigned partitions along with their current offsets. The ConsumerSeekCallback parameter allows you to manually control the seeking behavior of the consumer.

Örnek - seekToBeginning()
Listener'lar topic'lerin her zaman en başından başlasın istersek kullanılabilir. Şöyle yaparız
@Component
@Slf4j
public class FromBeginningKafkaMessageConsumer implements ConsumerSeekAware {
  @Override
  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
    ConsumerSeekCallback callback) {
    // Seek to the beginning of each assigned partition
    assignments.keySet().forEach(
      topicPartition -> callback.seekToBeginning(topicPartition.topic(),
                                                 topicPartition.partition()));

  }

  @KafkaListener(topics = "...")
  public void listen(ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    log.info("Received key : {} value : {}", key, value);
    // Process the message as per your requirement
  }
}
Örnek - seek() İle Belirtilen Offset'e Gitmek
Şöyle yaparız
@Service
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
  long offset = 777l; //your offset number
  String topic = "TopicName"; //your topic name
  String listenerId = "listenerId"; //your listener id

  //id of this listener has to be remembered for further
  //use in method getListenerContainer of KafkaListenerEndpointRegistry
  @KafkaListener(id = listenerId,
                 groupId = "groupName",
                 topics = topic)
  public void listenServiceCall(@Payload String message,
                                @Header(KafkaHeaders.OFFSET) Long offset) {
    ...
  }

  @Override
  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, 
                                   ConsumerSeekCallback callback) {
    assignments.keySet().forEach(partition ->
      callback.seek(this.topic, partition, this.offset));
    }
  }
}


10 Mayıs 2023 Çarşamba

SpringData MongoDB Query Sınıfı

Örnek
Şöyle yaparız
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;

// ...

String[] tags = {"fiction", "novel"};

Query query = new Query();
query.addCriteria(Criteria.where("tags").all(tags));

List<Book> books = mongoTemplate.find(query, Book.class);

SpringData MongoDB Reactive ReactiveMongoRepository Arayüzü

Giriş
Bazı metodlar şöyle
Flux<T> findAll()
Flux<T> save()
Mono<T> findById(ID id) 
Mono<Void> deleteById (ID id)
Kullanım
Örnek
Şöyle yaparız
@RestController
@RequestMapping("/employees")
public class EmployeeController {
  private final EmployeeRepository employeeRepository;
  public EmployeeController(EmployeeRepository employeeRepository) {
    this.employeeRepository = employeeRepository;
  }
  @GetMapping
  public Flux<Employee> getAllEmployees() {
    return employeeRepository.findAll();
  }
  @PostMapping
  public Mono<Employee> createEmployee(@RequestBody Employee employee) {
    return employeeRepository.save(employee);
  }
  @GetMapping("/{id}")
  public Mono<Employee> getEmployeeById(@PathVariable String id) {
    return employeeRepository.findById(id);
  }
  @PutMapping("/{id}")
  public Mono<Employee> updateEmployee(@PathVariable String id,
                                             @RequestBody Employee employee) {
      return employeeRepository.findById(id)
        .flatMap(existingEmployee -> {
          existingEmployee.setName(employee.getName());
          existingEmployee.setDepartment(employee.getDepartment());
          return employeeRepository.save(existingEmployee);
        });
    }
  @DeleteMapping("/{id}")
  public Mono<Void> deleteEmployee(@PathVariable String id) {
    return employeeRepository.deleteById(id);
  }
}
Pagination
Örnek
Şöyle yaparız
@Repository
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {
    Flux<Employee> findByName(String name, Pageable pageable);
}

@RestController
@RequestMapping("/employees")
public class EmployeeController {
  // existing code
  @GetMapping("/search")
  public Flux<Employee> searchEmployees(@RequestParam("name") String name,
                            @RequestParam(value = "page", defaultValue = "0") int page,
                            @RequestParam(value = "size", defaultValue = "10") int size) {
    PageRequest pageable = PageRequest.of(page, size, Sort.by("name"));
    return employeeRepository.findByName(name, pageable);
  }
}
flatMap metodu
Örnek
Şöyle yaparız. Burada update için kullanılıyor
@PutMapping("/{id}")
public Mono<Employee> updateEmployee(@PathVariable String id,
                                     @RequestBody Employee employee) {
  return employeeRepository.findById(id)
    .flatMap(existingEmployee -> {
      existingEmployee.setName(employee.getName());
      existingEmployee.setDepartment(employee.getDepartment());
      return employeeRepository.save(existingEmployee);
    });
}
Örnek
Elimizde iki tane ReactiveCrudRepository'den  kalıtan repository olsun
@Autowired
private CurrencyRepository currencyRepository;

@Autowired
private CurrencyArchiveRepository currencyArchiveRepository;
Şöyle yaparız
@Override
@Transactional
public Mono<Void> delete(final String currencyCode) {
  final CurrencyArchive currencyArchive = buildCurrencyArchive();

  return this.currencyArchiveRepository.save(currencyArchive)
      .flatMap(c -> this.currencyRepository.delete(c.getCode()))
      .then();
}
@Query İle Kullanımı
Şeklen şöyle

Açıklaması şöyle
‘?0’ indicates that the mentioned property should be equal to the zeroth parameter to the query method. Further, if there were more parameters, they could be referred to ?1, ?2, and so forth.
Örnek
Şöyle bir repository olsun
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;

@Repository
public interface FreelancerRepository extends ReactiveMongoRepository<Freelancer, String> {

    @Query("{ 'skills': { $all: ?0 } }")
    Flux<Freelancer> findBySkillsAll(List<String> skills);

    Flux<Freelancer> findBySkillsIn(List<String> skills);

}
Şöyle yaparız
@Service
public class FreelancerService {

  @Autowired
  private FreelancerRepository repository;

  public Flux<Freelancer> findBySkillsOne(final List<String> skills){
    return this.repository.findBySkillsIn(skills);
  }

  public Flux<Freelancer> findBySkillsAll(final List<String> skills){
    return this.repository.findBySkillsAll(skills);
  }

  public Mono<Freelancer> getPerson(final String id){
    return this.repository.findById(id);
  }

  public Mono<Freelancer> savePerson(final Freelancer person){
    return this.repository.save(person);
  }

  public Mono<Freelancer> updatePerson(final Freelancer person){
    return this.repository.findById(person.getId())
      .map(p -> person)
      .flatMap(this.repository::save);
  }

  public Mono<Void> deletePerson(final String id){
    return this.repository.deleteById(id);
  }
}